add configurable ColumnTypeMergePolicy to SegmentMetadataCache (#14319)

This PR adds a new interface to control how SegmentMetadataCache chooses ColumnType when faced with differences between segments for SQL schemas which are computed, exposed as druid.sql.planner.metadataColumnTypeMergePolicy and adds a new 'least restrictive type' mode to allow choosing the type that data across all segments can best be coerced into and sets this as the default behavior.

This is a behavior change around when segment driven schema migrations take effect for the SQL schema. With latestInterval, the SQL schema will be updated as soon as the first job with the new schema has published segments, while using leastRestrictive, the schema will only be updated once all segments are reindexed to the new type. The benefit of leastRestrictive is that it eliminates a bunch of type coercion errors that can happen in SQL when types are varied across segments with latestInterval because the newest type is not able to correctly represent older data, such as if the segments have a mix of ARRAY and number types, or any other combinations that lead to odd query plans.
This commit is contained in:
Clint Wylie 2023-05-24 08:02:51 -07:00 committed by GitHub
parent 22ba457d29
commit 4096f51f0b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 461 additions and 157 deletions

View File

@ -1953,6 +1953,7 @@ The Druid SQL server is configured through the following properties on the Broke
|`druid.sql.http.enable`|Whether to enable JSON over HTTP querying at `/druid/v2/sql/`.|true| |`druid.sql.http.enable`|Whether to enable JSON over HTTP querying at `/druid/v2/sql/`.|true|
|`druid.sql.planner.maxTopNLimit`|Maximum threshold for a [TopN query](../querying/topnquery.md). Higher limits will be planned as [GroupBy queries](../querying/groupbyquery.md) instead.|100000| |`druid.sql.planner.maxTopNLimit`|Maximum threshold for a [TopN query](../querying/topnquery.md). Higher limits will be planned as [GroupBy queries](../querying/groupbyquery.md) instead.|100000|
|`druid.sql.planner.metadataRefreshPeriod`|Throttle for metadata refreshes.|PT1M| |`druid.sql.planner.metadataRefreshPeriod`|Throttle for metadata refreshes.|PT1M|
|`druid.sql.planner.metadataColumnTypeMergePolicy`|Defines how column types will be chosen when faced with differences between segments when computing the SQL schema. Options are specified as a JSON object, with valid choices of `leastRestrictive` or `latestInterval`. For `leastRestrictive`, Druid will automatically widen the type computed for the schema to a type which data across all segments can be converted into, however planned schema migrations can only take effect once all segments have been re-ingested to the new schema. With `latestInterval`, the column type in most recent time chunks defines the type for the schema. |`leastRestrictive`|
|`druid.sql.planner.useApproximateCountDistinct`|Whether to use an approximate cardinality algorithm for `COUNT(DISTINCT foo)`.|true| |`druid.sql.planner.useApproximateCountDistinct`|Whether to use an approximate cardinality algorithm for `COUNT(DISTINCT foo)`.|true|
|`druid.sql.planner.useGroupingSetForExactDistinct`|Only relevant when `useApproximateCountDistinct` is disabled. If set to true, exact distinct queries are re-written using grouping sets. Otherwise, exact distinct queries are re-written using joins. This should be set to true for group by query with multiple exact distinct aggregations. This flag can be overridden per query.|false| |`druid.sql.planner.useGroupingSetForExactDistinct`|Only relevant when `useApproximateCountDistinct` is disabled. If set to true, exact distinct queries are re-written using grouping sets. Otherwise, exact distinct queries are re-written using joins. This should be set to true for group by query with multiple exact distinct aggregations. This flag can be overridden per query.|false|
|`druid.sql.planner.useApproximateTopN`|Whether to use approximate [TopN queries](../querying/topnquery.md) when a SQL query could be expressed as such. If false, exact [GroupBy queries](../querying/groupbyquery.md) will be used instead.|true| |`druid.sql.planner.useApproximateTopN`|Whether to use approximate [TopN queries](../querying/topnquery.md) when a SQL query could be expressed as such. If false, exact [GroupBy queries](../querying/groupbyquery.md) will be used instead.|true|

View File

@ -19,7 +19,6 @@
package org.apache.druid.math.expr; package org.apache.druid.math.expr;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.segment.column.Types; import org.apache.druid.segment.column.Types;
import javax.annotation.Nullable; import javax.annotation.Nullable;
@ -83,7 +82,7 @@ public class ExpressionTypeConversion
} }
if (type.isArray() || other.isArray()) { if (type.isArray() || other.isArray()) {
if (!Objects.equals(type, other)) { if (!Objects.equals(type, other)) {
throw new IAE("Cannot implicitly cast %s to %s", type, other); throw new Types.IncompatibleTypeException(type, other);
} }
return type; return type;
} }
@ -95,7 +94,7 @@ public class ExpressionTypeConversion
return type; return type;
} }
if (!Objects.equals(type, other)) { if (!Objects.equals(type, other)) {
throw new IAE("Cannot implicitly cast %s to %s", type, other); throw new Types.IncompatibleTypeException(type, other);
} }
return type; return type;
} }
@ -128,7 +127,7 @@ public class ExpressionTypeConversion
// arrays cannot be auto converted // arrays cannot be auto converted
if (type.isArray() || other.isArray()) { if (type.isArray() || other.isArray()) {
if (!Objects.equals(type, other)) { if (!Objects.equals(type, other)) {
throw new IAE("Cannot implicitly cast %s to %s", type, other); throw new Types.IncompatibleTypeException(type, other);
} }
return type; return type;
} }
@ -140,7 +139,7 @@ public class ExpressionTypeConversion
return type; return type;
} }
if (!Objects.equals(type, other)) { if (!Objects.equals(type, other)) {
throw new IAE("Cannot implicitly cast %s to %s", type, other); throw new Types.IncompatibleTypeException(type, other);
} }
return type; return type;
} }
@ -177,7 +176,7 @@ public class ExpressionTypeConversion
return ExpressionTypeFactory.getInstance().ofArray(newElementType); return ExpressionTypeFactory.getInstance().ofArray(newElementType);
} }
throw new IAE("Cannot implicitly cast %s to %s", type, other); throw new Types.IncompatibleTypeException(type, other);
} }

View File

@ -93,6 +93,9 @@ public class NestedDataColumnMerger implements DimensionMergerV9
final IndexableAdapter.NestedColumnMergable mergable = closer.register( final IndexableAdapter.NestedColumnMergable mergable = closer.register(
adapter.getNestedColumnMergeables(name) adapter.getNestedColumnMergeables(name)
); );
if (mergable == null) {
continue;
}
final SortedValueDictionary dimValues = mergable.getValueDictionary(); final SortedValueDictionary dimValues = mergable.getValueDictionary();
boolean allNulls = dimValues == null || dimValues.allNull(); boolean allNulls = dimValues == null || dimValues.allNull();

View File

@ -23,7 +23,6 @@ import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.annotation.JsonSerialize; import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import com.fasterxml.jackson.databind.ser.std.ToStringSerializer; import com.fasterxml.jackson.databind.ser.std.ToStringSerializer;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.segment.nested.NestedDataComplexTypeSerde; import org.apache.druid.segment.nested.NestedDataComplexTypeSerde;
import javax.annotation.Nullable; import javax.annotation.Nullable;
@ -152,7 +151,7 @@ public class ColumnType extends BaseTypeSignature<ValueType>
* inference * inference
*/ */
@Nullable @Nullable
public static ColumnType leastRestrictiveType(@Nullable ColumnType type, @Nullable ColumnType other) public static ColumnType leastRestrictiveType(@Nullable ColumnType type, @Nullable ColumnType other) throws Types.IncompatibleTypeException
{ {
if (type == null) { if (type == null) {
return other; return other;
@ -168,7 +167,7 @@ public class ColumnType extends BaseTypeSignature<ValueType>
return type; return type;
} }
if (!Objects.equals(type, other)) { if (!Objects.equals(type, other)) {
throw new IAE("Cannot implicitly cast %s to %s", type, other); throw new Types.IncompatibleTypeException(type, other);
} }
return type; return type;
} }
@ -177,7 +176,7 @@ public class ColumnType extends BaseTypeSignature<ValueType>
if (ColumnType.NESTED_DATA.equals(type) || ColumnType.NESTED_DATA.equals(other)) { if (ColumnType.NESTED_DATA.equals(type) || ColumnType.NESTED_DATA.equals(other)) {
return ColumnType.NESTED_DATA; return ColumnType.NESTED_DATA;
} }
throw new IAE("Cannot implicitly cast %s to %s", type, other); throw new Types.IncompatibleTypeException(type, other);
} }
// arrays convert based on least restrictive element type // arrays convert based on least restrictive element type
@ -186,11 +185,13 @@ public class ColumnType extends BaseTypeSignature<ValueType>
return type; return type;
} }
final ColumnType commonElementType; final ColumnType commonElementType;
// commonElementType cannot be null if we got this far, we always return a value unless both args are null
if (other.isArray()) { if (other.isArray()) {
commonElementType = leastRestrictiveType( commonElementType = leastRestrictiveType(
(ColumnType) type.getElementType(), (ColumnType) type.getElementType(),
(ColumnType) other.getElementType() (ColumnType) other.getElementType()
); );
return ColumnType.ofArray(commonElementType); return ColumnType.ofArray(commonElementType);
} else { } else {
commonElementType = leastRestrictiveType( commonElementType = leastRestrictiveType(
@ -218,13 +219,14 @@ public class ColumnType extends BaseTypeSignature<ValueType>
} }
// all numbers win over longs // all numbers win over longs
// floats vs doubles would be handled here, but we currently only support doubles...
if (Types.is(type, ValueType.LONG) && Types.isNullOr(other, ValueType.LONG)) { if (Types.is(type, ValueType.LONG) && Types.isNullOr(other, ValueType.LONG)) {
return ColumnType.LONG; return ColumnType.LONG;
} }
// doubles win over floats
if (Types.is(type, ValueType.FLOAT) && Types.isNullOr(other, ValueType.FLOAT)) { if (Types.is(type, ValueType.FLOAT) && Types.isNullOr(other, ValueType.FLOAT)) {
return ColumnType.FLOAT; return ColumnType.FLOAT;
} }
return ColumnType.DOUBLE; return ColumnType.DOUBLE;
} }
} }

View File

@ -20,6 +20,7 @@
package org.apache.druid.segment.column; package org.apache.druid.segment.column;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.StringUtils;
import javax.annotation.Nullable; import javax.annotation.Nullable;
@ -112,4 +113,12 @@ public class Types
return (typeSignature1 != null && typeSignature1.is(typeDescriptor)) || return (typeSignature1 != null && typeSignature1.is(typeDescriptor)) ||
(typeSignature2 != null && typeSignature2.is(typeDescriptor)); (typeSignature2 != null && typeSignature2.is(typeDescriptor));
} }
public static class IncompatibleTypeException extends IAE
{
public IncompatibleTypeException(TypeSignature<?> type, TypeSignature<?> other)
{
super("Cannot implicitly cast [%s] to [%s]", type, other);
}
}
} }

View File

@ -163,8 +163,8 @@ public class ColumnTypeTest
Assert.assertEquals(ColumnType.NESTED_DATA, ColumnType.leastRestrictiveType(ColumnType.NESTED_DATA, ColumnType.UNKNOWN_COMPLEX)); Assert.assertEquals(ColumnType.NESTED_DATA, ColumnType.leastRestrictiveType(ColumnType.NESTED_DATA, ColumnType.UNKNOWN_COMPLEX));
Assert.assertEquals(SOME_COMPLEX, ColumnType.leastRestrictiveType(SOME_COMPLEX, ColumnType.UNKNOWN_COMPLEX)); Assert.assertEquals(SOME_COMPLEX, ColumnType.leastRestrictiveType(SOME_COMPLEX, ColumnType.UNKNOWN_COMPLEX));
Assert.assertEquals(SOME_COMPLEX, ColumnType.leastRestrictiveType(ColumnType.UNKNOWN_COMPLEX, SOME_COMPLEX)); Assert.assertEquals(SOME_COMPLEX, ColumnType.leastRestrictiveType(ColumnType.UNKNOWN_COMPLEX, SOME_COMPLEX));
Assert.assertThrows(IllegalArgumentException.class, () -> ColumnType.leastRestrictiveType(ColumnType.NESTED_DATA, SOME_COMPLEX)); Assert.assertThrows(Types.IncompatibleTypeException.class, () -> ColumnType.leastRestrictiveType(ColumnType.NESTED_DATA, SOME_COMPLEX));
Assert.assertThrows(IllegalArgumentException.class, () -> ColumnType.leastRestrictiveType(ColumnType.STRING_ARRAY, SOME_COMPLEX)); Assert.assertThrows(Types.IncompatibleTypeException.class, () -> ColumnType.leastRestrictiveType(ColumnType.STRING_ARRAY, SOME_COMPLEX));
} }
static class SomeOtherTypeSignature extends BaseTypeSignature<ValueType> static class SomeOtherTypeSignature extends BaseTypeSignature<ValueType>

View File

@ -31,6 +31,8 @@ import org.apache.druid.sql.calcite.rule.ExtensionCalciteRuleProvider;
*/ */
public class CalcitePlannerModule implements Module public class CalcitePlannerModule implements Module
{ {
public static final String CONFIG_BASE = "druid.sql.planner";
@Override @Override
public void configure(Binder binder) public void configure(Binder binder)
{ {
@ -38,8 +40,8 @@ public class CalcitePlannerModule implements Module
// so both configs are bound to the same property prefix. // so both configs are bound to the same property prefix.
// It turns out that the order of the arguments above is misleading. // It turns out that the order of the arguments above is misleading.
// We're actually binding the class to the config prefix, not the other way around. // We're actually binding the class to the config prefix, not the other way around.
JsonConfigProvider.bind(binder, "druid.sql.planner", PlannerConfig.class); JsonConfigProvider.bind(binder, CONFIG_BASE, PlannerConfig.class);
JsonConfigProvider.bind(binder, "druid.sql.planner", SegmentMetadataCacheConfig.class); JsonConfigProvider.bind(binder, CONFIG_BASE, SegmentMetadataCacheConfig.class);
binder.bind(PlannerFactory.class).in(LazySingleton.class); binder.bind(PlannerFactory.class).in(LazySingleton.class);
binder.bind(DruidOperatorTable.class).in(LazySingleton.class); binder.bind(DruidOperatorTable.class).in(LazySingleton.class);
Multibinder.newSetBinder(binder, ExtensionCalciteRuleProvider.class); Multibinder.newSetBinder(binder, ExtensionCalciteRuleProvider.class);

View File

@ -20,10 +20,9 @@
package org.apache.druid.sql.calcite.planner; package org.apache.druid.sql.calcite.planner;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.sql.calcite.schema.SegmentMetadataCache;
import org.joda.time.Period; import org.joda.time.Period;
import java.util.Objects;
/** /**
* Configuration properties for the Broker-side cache of segment metadata * Configuration properties for the Broker-side cache of segment metadata
* used to infer datasources for SQL. This class shares the same config root * used to infer datasources for SQL. This class shares the same config root
@ -32,6 +31,9 @@ import java.util.Objects;
*/ */
public class SegmentMetadataCacheConfig public class SegmentMetadataCacheConfig
{ {
@JsonProperty
private boolean awaitInitializationOnStart = true;
@JsonProperty @JsonProperty
private boolean metadataSegmentCacheEnable = false; private boolean metadataSegmentCacheEnable = false;
@ -42,7 +44,8 @@ public class SegmentMetadataCacheConfig
private Period metadataRefreshPeriod = new Period("PT1M"); private Period metadataRefreshPeriod = new Period("PT1M");
@JsonProperty @JsonProperty
private boolean awaitInitializationOnStart = true; private SegmentMetadataCache.ColumnTypeMergePolicy metadataColumnTypeMergePolicy =
new SegmentMetadataCache.LeastRestrictiveTypeMergePolicy();
public static SegmentMetadataCacheConfig create() public static SegmentMetadataCacheConfig create()
{ {
@ -78,31 +81,9 @@ public class SegmentMetadataCacheConfig
return metadataSegmentPollPeriod; return metadataSegmentPollPeriod;
} }
@Override public SegmentMetadataCache.ColumnTypeMergePolicy getMetadataColumnTypeMergePolicy()
public boolean equals(final Object o)
{ {
if (this == o) { return metadataColumnTypeMergePolicy;
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
final SegmentMetadataCacheConfig that = (SegmentMetadataCacheConfig) o;
return awaitInitializationOnStart == that.awaitInitializationOnStart &&
metadataSegmentCacheEnable == that.metadataSegmentCacheEnable &&
metadataSegmentPollPeriod == that.metadataSegmentPollPeriod &&
Objects.equals(metadataRefreshPeriod, that.metadataRefreshPeriod);
}
@Override
public int hashCode()
{
return Objects.hash(
metadataRefreshPeriod,
awaitInitializationOnStart,
metadataSegmentCacheEnable,
metadataSegmentPollPeriod
);
} }
@Override @Override
@ -113,6 +94,7 @@ public class SegmentMetadataCacheConfig
", metadataSegmentCacheEnable=" + metadataSegmentCacheEnable + ", metadataSegmentCacheEnable=" + metadataSegmentCacheEnable +
", metadataSegmentPollPeriod=" + metadataSegmentPollPeriod + ", metadataSegmentPollPeriod=" + metadataSegmentPollPeriod +
", awaitInitializationOnStart=" + awaitInitializationOnStart + ", awaitInitializationOnStart=" + awaitInitializationOnStart +
", metadataColumnTypeMergePolicy=" + metadataColumnTypeMergePolicy +
'}'; '}';
} }
} }

View File

@ -19,6 +19,7 @@
package org.apache.druid.sql.calcite.schema; package org.apache.druid.sql.calcite.schema;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.common.base.Predicates; import com.google.common.base.Predicates;
@ -36,6 +37,7 @@ import org.apache.druid.client.ServerView;
import org.apache.druid.client.TimelineServerView; import org.apache.druid.client.TimelineServerView;
import org.apache.druid.guice.ManageLifecycle; import org.apache.druid.guice.ManageLifecycle;
import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.guava.Sequence; import org.apache.druid.java.util.common.guava.Sequence;
@ -56,6 +58,7 @@ import org.apache.druid.query.metadata.metadata.SegmentMetadataQuery;
import org.apache.druid.query.spec.MultipleSpecificSegmentSpec; import org.apache.druid.query.spec.MultipleSpecificSegmentSpec;
import org.apache.druid.segment.column.ColumnType; import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.RowSignature; import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.segment.column.Types;
import org.apache.druid.segment.join.JoinableFactory; import org.apache.druid.segment.join.JoinableFactory;
import org.apache.druid.server.QueryLifecycleFactory; import org.apache.druid.server.QueryLifecycleFactory;
import org.apache.druid.server.SegmentManager; import org.apache.druid.server.SegmentManager;
@ -75,6 +78,7 @@ import java.util.EnumSet;
import java.util.HashSet; import java.util.HashSet;
import java.util.LinkedHashMap; import java.util.LinkedHashMap;
import java.util.Map; import java.util.Map;
import java.util.Objects;
import java.util.Optional; import java.util.Optional;
import java.util.Set; import java.util.Set;
import java.util.TreeMap; import java.util.TreeMap;
@ -107,7 +111,6 @@ public class SegmentMetadataCache
private static final int MAX_SEGMENTS_PER_QUERY = 15000; private static final int MAX_SEGMENTS_PER_QUERY = 15000;
private static final long DEFAULT_NUM_ROWS = 0; private static final long DEFAULT_NUM_ROWS = 0;
private static final Interner<RowSignature> ROW_SIGNATURE_INTERNER = Interners.newWeakInterner(); private static final Interner<RowSignature> ROW_SIGNATURE_INTERNER = Interners.newWeakInterner();
private final QueryLifecycleFactory queryLifecycleFactory; private final QueryLifecycleFactory queryLifecycleFactory;
private final SegmentMetadataCacheConfig config; private final SegmentMetadataCacheConfig config;
// Escalator, so we can attach an authentication result to queries we generate. // Escalator, so we can attach an authentication result to queries we generate.
@ -117,6 +120,7 @@ public class SegmentMetadataCache
private final ExecutorService cacheExec; private final ExecutorService cacheExec;
private final ExecutorService callbackExec; private final ExecutorService callbackExec;
private final ServiceEmitter emitter; private final ServiceEmitter emitter;
private final ColumnTypeMergePolicy columnTypeMergePolicy;
/** /**
* Map of DataSource -> DruidTable. * Map of DataSource -> DruidTable.
@ -229,6 +233,7 @@ public class SegmentMetadataCache
this.segmentManager = segmentManager; this.segmentManager = segmentManager;
this.joinableFactory = joinableFactory; this.joinableFactory = joinableFactory;
this.config = Preconditions.checkNotNull(config, "config"); this.config = Preconditions.checkNotNull(config, "config");
this.columnTypeMergePolicy = config.getMetadataColumnTypeMergePolicy();
this.cacheExec = Execs.singleThreaded("DruidSchema-Cache-%d"); this.cacheExec = Execs.singleThreaded("DruidSchema-Cache-%d");
this.callbackExec = Execs.singleThreaded("DruidSchema-Callback-%d"); this.callbackExec = Execs.singleThreaded("DruidSchema-Callback-%d");
this.escalator = escalator; this.escalator = escalator;
@ -803,25 +808,11 @@ public class SegmentMetadataCache
final RowSignature rowSignature = availableSegmentMetadata.getRowSignature(); final RowSignature rowSignature = availableSegmentMetadata.getRowSignature();
if (rowSignature != null) { if (rowSignature != null) {
for (String column : rowSignature.getColumnNames()) { for (String column : rowSignature.getColumnNames()) {
// Newer column types should override older ones.
final ColumnType columnType = final ColumnType columnType =
rowSignature.getColumnType(column) rowSignature.getColumnType(column)
.orElseThrow(() -> new ISE("Encountered null type for column [%s]", column)); .orElseThrow(() -> new ISE("Encountered null type for column [%s]", column));
columnTypes.compute(column, (c, existingType) -> { columnTypes.compute(column, (c, existingType) -> columnTypeMergePolicy.merge(existingType, columnType));
if (existingType == null) {
return columnType;
}
if (columnType == null) {
return existingType;
}
// if any are json, are all json
if (ColumnType.NESTED_DATA.equals(columnType) || ColumnType.NESTED_DATA.equals(existingType)) {
return ColumnType.NESTED_DATA;
}
// "existing type" is the 'newest' type, since we iterate the segments list by newest start time
return existingType;
});
} }
} }
} }
@ -995,4 +986,126 @@ public class SegmentMetadataCache
runnable.run(); runnable.run();
} }
} }
/**
* ColumnTypeMergePolicy defines the rules of which type to use when faced with the possibility of different types
* for the same column from segment to segment. It is used to help compute a {@link RowSignature} for a table in
* Druid based on the segment metadata of all segments, merging the types of each column encountered to end up with
* a single type to represent it globally.
*/
@FunctionalInterface
public interface ColumnTypeMergePolicy
{
ColumnType merge(ColumnType existingType, ColumnType newType);
@JsonCreator
static ColumnTypeMergePolicy fromString(String type)
{
if (LeastRestrictiveTypeMergePolicy.NAME.equalsIgnoreCase(type)) {
return LeastRestrictiveTypeMergePolicy.INSTANCE;
}
if (FirstTypeMergePolicy.NAME.equalsIgnoreCase(type)) {
return FirstTypeMergePolicy.INSTANCE;
}
throw new IAE("Unknown type [%s]", type);
}
}
/**
* Classic logic, we use the first type we encounter. This policy is effectively 'newest first' because we iterated
* segments starting from the most recent time chunk, so this typically results in the most recently used type being
* chosen, at least for systems that are continuously updated with 'current' data.
*
* Since {@link ColumnTypeMergePolicy} are used to compute the SQL schema, at least in systems using SQL schemas which
* are partially or fully computed by this cache, this merge policy can result in query time errors if incompatible
* types are mixed if the chosen type is more restrictive than the types of some segments. If data is likely to vary
* in type across segments, consider using {@link LeastRestrictiveTypeMergePolicy} instead.
*/
public static class FirstTypeMergePolicy implements ColumnTypeMergePolicy
{
public static final String NAME = "latestInterval";
private static final FirstTypeMergePolicy INSTANCE = new FirstTypeMergePolicy();
@Override
public ColumnType merge(ColumnType existingType, ColumnType newType)
{
if (existingType == null) {
return newType;
}
if (newType == null) {
return existingType;
}
// if any are json, are all json
if (ColumnType.NESTED_DATA.equals(newType) || ColumnType.NESTED_DATA.equals(existingType)) {
return ColumnType.NESTED_DATA;
}
// "existing type" is the 'newest' type, since we iterate the segments list by newest start time
return existingType;
}
@Override
public int hashCode()
{
return Objects.hash(NAME);
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
return o != null && getClass() == o.getClass();
}
@Override
public String toString()
{
return NAME;
}
}
/**
* Resolves types using {@link ColumnType#leastRestrictiveType(ColumnType, ColumnType)} to find the ColumnType that
* can best represent all data contained across all segments.
*/
public static class LeastRestrictiveTypeMergePolicy implements ColumnTypeMergePolicy
{
public static final String NAME = "leastRestrictive";
private static final LeastRestrictiveTypeMergePolicy INSTANCE = new LeastRestrictiveTypeMergePolicy();
@Override
public ColumnType merge(ColumnType existingType, ColumnType newType)
{
try {
return ColumnType.leastRestrictiveType(existingType, newType);
}
catch (Types.IncompatibleTypeException incompatibleTypeException) {
// fall back to first encountered type if they are not compatible for some reason
return FirstTypeMergePolicy.INSTANCE.merge(existingType, newType);
}
}
@Override
public int hashCode()
{
return Objects.hash(NAME);
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
return o != null && getClass() == o.getClass();
}
@Override
public String toString()
{
return NAME;
}
}
} }

View File

@ -19,12 +19,17 @@
package org.apache.druid.sql.calcite.planner; package org.apache.druid.sql.calcite.planner;
import com.google.common.collect.ImmutableList;
import com.google.inject.Injector;
import org.apache.druid.guice.GuiceInjectors;
import org.apache.druid.guice.JsonConfigProvider;
import org.apache.druid.guice.JsonConfigurator;
import org.apache.druid.sql.calcite.schema.SegmentMetadataCache;
import org.joda.time.Period; import org.joda.time.Period;
import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
import static org.junit.Assert.assertEquals; import java.util.Properties;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
/** /**
* Pathetic little unit test just to keep Jacoco happy. * Pathetic little unit test just to keep Jacoco happy.
@ -32,19 +37,60 @@ import static org.junit.Assert.assertTrue;
public class SegmentMetadataCacheConfigTest public class SegmentMetadataCacheConfigTest
{ {
@Test @Test
public void testConfig() public void testDefaultConfig()
{ {
SegmentMetadataCacheConfig config = SegmentMetadataCacheConfig.create("PT1M"); final Injector injector = createInjector();
assertEquals(Period.minutes(1), config.getMetadataRefreshPeriod()); final JsonConfigProvider<SegmentMetadataCacheConfig> provider = JsonConfigProvider.of(
assertTrue(config.isAwaitInitializationOnStart()); CalcitePlannerModule.CONFIG_BASE,
// Not legal per IntelliJ inspections. Should be testable, but IntelliJ SegmentMetadataCacheConfig.class
// won't allow this code. );
//assertTrue(config.equals(config)); final Properties properties = new Properties();
// Workaround provider.inject(properties, injector.getInstance(JsonConfigurator.class));
assertTrue(config.equals(SegmentMetadataCacheConfig.create("PT1M"))); final SegmentMetadataCacheConfig config = provider.get();
assertFalse(config.equals(null)); Assert.assertTrue(config.isAwaitInitializationOnStart());
assertTrue(config.equals(SegmentMetadataCacheConfig.create("PT1M"))); Assert.assertFalse(config.isMetadataSegmentCacheEnable());
assertFalse(config.equals(SegmentMetadataCacheConfig.create("PT2M"))); Assert.assertEquals(Period.minutes(1), config.getMetadataRefreshPeriod());
assertTrue(config.hashCode() != 0); Assert.assertEquals(60_000, config.getMetadataSegmentPollPeriod());
Assert.assertEquals(new SegmentMetadataCache.LeastRestrictiveTypeMergePolicy(), config.getMetadataColumnTypeMergePolicy());
}
@Test
public void testCustomizedConfig()
{
final Injector injector = createInjector();
final JsonConfigProvider<SegmentMetadataCacheConfig> provider = JsonConfigProvider.of(
CalcitePlannerModule.CONFIG_BASE,
SegmentMetadataCacheConfig.class
);
final Properties properties = new Properties();
properties.setProperty(
CalcitePlannerModule.CONFIG_BASE + ".metadataColumnTypeMergePolicy",
"latestInterval"
);
properties.setProperty(CalcitePlannerModule.CONFIG_BASE + ".metadataRefreshPeriod", "PT2M");
properties.setProperty(CalcitePlannerModule.CONFIG_BASE + ".metadataSegmentPollPeriod", "15000");
properties.setProperty(CalcitePlannerModule.CONFIG_BASE + ".metadataSegmentCacheEnable", "true");
properties.setProperty(CalcitePlannerModule.CONFIG_BASE + ".awaitInitializationOnStart", "false");
provider.inject(properties, injector.getInstance(JsonConfigurator.class));
final SegmentMetadataCacheConfig config = provider.get();
Assert.assertFalse(config.isAwaitInitializationOnStart());
Assert.assertTrue(config.isMetadataSegmentCacheEnable());
Assert.assertEquals(Period.minutes(2), config.getMetadataRefreshPeriod());
Assert.assertEquals(15_000, config.getMetadataSegmentPollPeriod());
Assert.assertEquals(
new SegmentMetadataCache.FirstTypeMergePolicy(),
config.getMetadataColumnTypeMergePolicy()
);
}
private Injector createInjector()
{
return GuiceInjectors.makeStartupInjectorWithModules(
ImmutableList.of(
binder -> {
JsonConfigProvider.bind(binder, CalcitePlannerModule.CONFIG_BASE, SegmentMetadataCacheConfig.class);
}
)
);
} }
} }

View File

@ -32,6 +32,10 @@ import org.apache.calcite.rel.type.RelDataTypeField;
import org.apache.calcite.sql.type.SqlTypeName; import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.druid.client.BrokerInternalQueryConfig; import org.apache.druid.client.BrokerInternalQueryConfig;
import org.apache.druid.client.ImmutableDruidServer; import org.apache.druid.client.ImmutableDruidServer;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.InputRowSchema;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.guava.Sequences; import org.apache.druid.java.util.common.guava.Sequences;
@ -64,10 +68,12 @@ import org.apache.druid.server.metrics.NoopServiceEmitter;
import org.apache.druid.server.security.Access; import org.apache.druid.server.security.Access;
import org.apache.druid.server.security.AllowAllAuthenticator; import org.apache.druid.server.security.AllowAllAuthenticator;
import org.apache.druid.server.security.NoopEscalator; import org.apache.druid.server.security.NoopEscalator;
import org.apache.druid.sql.calcite.planner.SegmentMetadataCacheConfig;
import org.apache.druid.sql.calcite.table.DatasourceTable; import org.apache.druid.sql.calcite.table.DatasourceTable;
import org.apache.druid.sql.calcite.table.DruidTable; import org.apache.druid.sql.calcite.table.DruidTable;
import org.apache.druid.sql.calcite.util.CalciteTests; import org.apache.druid.sql.calcite.util.CalciteTests;
import org.apache.druid.sql.calcite.util.SpecificSegmentsQuerySegmentWalker; import org.apache.druid.sql.calcite.util.SpecificSegmentsQuerySegmentWalker;
import org.apache.druid.sql.calcite.util.TestDataBuilder;
import org.apache.druid.sql.calcite.util.TestServerInventoryView; import org.apache.druid.sql.calcite.util.TestServerInventoryView;
import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.DataSegment.PruneSpecsHolder; import org.apache.druid.timeline.DataSegment.PruneSpecsHolder;
@ -137,6 +143,78 @@ public class SegmentMetadataCacheTest extends SegmentMetadataCacheCommon
) )
.rows(ROWS2) .rows(ROWS2)
.buildMMappedIndex(); .buildMMappedIndex();
final InputRowSchema rowSchema = new InputRowSchema(
new TimestampSpec("t", null, null),
DimensionsSpec.builder().useSchemaDiscovery(true).build(),
null
);
final List<InputRow> autoRows1 = ImmutableList.of(
TestDataBuilder.createRow(
ImmutableMap.<String, Object>builder()
.put("t", "2023-01-01T00:00Z")
.put("numbery", 1.1f)
.put("numberyArrays", ImmutableList.of(1L, 2L, 3L))
.put("stringy", ImmutableList.of("a", "b", "c"))
.put("array", ImmutableList.of(1.1, 2.2, 3.3))
.put("nested", ImmutableMap.of("x", 1L, "y", 2L))
.build(),
rowSchema
)
);
final List<InputRow> autoRows2 = ImmutableList.of(
TestDataBuilder.createRow(
ImmutableMap.<String, Object>builder()
.put("t", "2023-01-02T00:00Z")
.put("numbery", 1L)
.put("numberyArrays", ImmutableList.of(3.3, 2.2, 3.1))
.put("stringy", "a")
.put("array", ImmutableList.of(1L, 2L, 3L))
.put("nested", "hello")
.build(),
rowSchema
)
);
final QueryableIndex indexAuto1 = IndexBuilder.create()
.tmpDir(new File(tmpDir, "1"))
.segmentWriteOutMediumFactory(OffHeapMemorySegmentWriteOutMediumFactory.instance())
.schema(
new IncrementalIndexSchema.Builder()
.withTimestampSpec(rowSchema.getTimestampSpec())
.withDimensionsSpec(rowSchema.getDimensionsSpec())
.withMetrics(
new CountAggregatorFactory("cnt"),
new DoubleSumAggregatorFactory("m1", "m1"),
new HyperUniquesAggregatorFactory("unique_dim1", "dim1")
)
.withRollup(false)
.build()
)
.rows(autoRows1)
.buildMMappedIndex();
final QueryableIndex indexAuto2 = IndexBuilder.create()
.tmpDir(new File(tmpDir, "1"))
.segmentWriteOutMediumFactory(OffHeapMemorySegmentWriteOutMediumFactory.instance())
.schema(
new IncrementalIndexSchema.Builder()
.withTimestampSpec(
new TimestampSpec("t", null, null)
)
.withDimensionsSpec(
DimensionsSpec.builder().useSchemaDiscovery(true).build()
)
.withMetrics(
new CountAggregatorFactory("cnt"),
new DoubleSumAggregatorFactory("m1", "m1"),
new HyperUniquesAggregatorFactory("unique_dim1", "dim1")
)
.withRollup(false)
.build()
)
.rows(autoRows2)
.buildMMappedIndex();
walker = new SpecificSegmentsQuerySegmentWalker(conglomerate).add( walker = new SpecificSegmentsQuerySegmentWalker(conglomerate).add(
DataSegment.builder() DataSegment.builder()
.dataSource(CalciteTests.DATASOURCE1) .dataSource(CalciteTests.DATASOURCE1)
@ -164,6 +242,24 @@ public class SegmentMetadataCacheTest extends SegmentMetadataCacheCommon
.size(0) .size(0)
.build(), .build(),
index2 index2
).add(
DataSegment.builder()
.dataSource(CalciteTests.SOME_DATASOURCE)
.interval(Intervals.of("2023-01-01T00Z/P1D"))
.version("1")
.shardSpec(new LinearShardSpec(1))
.size(0)
.build(),
indexAuto1
).add(
DataSegment.builder()
.dataSource(CalciteTests.SOME_DATASOURCE)
.interval(Intervals.of("2023-01-02T00Z/P1D"))
.version("1")
.shardSpec(new LinearShardSpec(1))
.size(0)
.build(),
indexAuto2
); );
final DataSegment segment1 = new DataSegment( final DataSegment segment1 = new DataSegment(
"foo3", "foo3",
@ -183,7 +279,12 @@ public class SegmentMetadataCacheTest extends SegmentMetadataCacheCommon
druidServers = serverView.getDruidServers(); druidServers = serverView.getDruidServers();
} }
public SegmentMetadataCache buildSchema1() throws InterruptedException public SegmentMetadataCache buildSchemaMarkAndTableLatch() throws InterruptedException
{
return buildSchemaMarkAndTableLatch(SEGMENT_CACHE_CONFIG_DEFAULT);
}
public SegmentMetadataCache buildSchemaMarkAndTableLatch(SegmentMetadataCacheConfig config) throws InterruptedException
{ {
Preconditions.checkState(runningSchema == null); Preconditions.checkState(runningSchema == null);
runningSchema = new SegmentMetadataCache( runningSchema = new SegmentMetadataCache(
@ -194,7 +295,7 @@ public class SegmentMetadataCacheTest extends SegmentMetadataCacheCommon
ImmutableSet.of(globalTableJoinable), ImmutableSet.of(globalTableJoinable),
ImmutableMap.of(globalTableJoinable.getClass(), GlobalTableDataSource.class) ImmutableMap.of(globalTableJoinable.getClass(), GlobalTableDataSource.class)
), ),
SEGMENT_CACHE_CONFIG_DEFAULT, config,
new NoopEscalator(), new NoopEscalator(),
new BrokerInternalQueryConfig(), new BrokerInternalQueryConfig(),
new NoopServiceEmitter() new NoopServiceEmitter()
@ -221,58 +322,7 @@ public class SegmentMetadataCacheTest extends SegmentMetadataCacheCommon
return runningSchema; return runningSchema;
} }
public SegmentMetadataCache buildSchema2() throws InterruptedException public SegmentMetadataCache buildSchemaMarkAndRefreshLatch() throws InterruptedException
{
Preconditions.checkState(runningSchema == null);
runningSchema = new SegmentMetadataCache(
CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate),
serverView,
segmentManager,
new MapJoinableFactory(
ImmutableSet.of(globalTableJoinable),
ImmutableMap.of(globalTableJoinable.getClass(), GlobalTableDataSource.class)
),
SEGMENT_CACHE_CONFIG_DEFAULT,
new NoopEscalator(),
new BrokerInternalQueryConfig(),
new NoopServiceEmitter()
)
{
boolean throwException = true;
@Override
protected DatasourceTable.PhysicalDatasourceMetadata buildDruidTable(String dataSource)
{
DatasourceTable.PhysicalDatasourceMetadata table = super.buildDruidTable(dataSource);
buildTableLatch.countDown();
return table;
}
@Override
protected Set<SegmentId> refreshSegments(final Set<SegmentId> segments) throws IOException
{
if (throwException) {
throwException = false;
throw new RuntimeException("Query[xxxx] url[http://xxxx:8083/druid/v2/] timed out.");
} else {
return super.refreshSegments(segments);
}
}
@Override
void markDataSourceAsNeedRebuild(String datasource)
{
super.markDataSourceAsNeedRebuild(datasource);
markDataSourceLatch.countDown();
}
};
runningSchema.start();
runningSchema.awaitInitialization();
return runningSchema;
}
public SegmentMetadataCache buildSchema3() throws InterruptedException
{ {
Preconditions.checkState(runningSchema == null); Preconditions.checkState(runningSchema == null);
runningSchema = new SegmentMetadataCache( runningSchema = new SegmentMetadataCache(
@ -322,24 +372,24 @@ public class SegmentMetadataCacheTest extends SegmentMetadataCacheCommon
@Test @Test
public void testGetTableMap() throws InterruptedException public void testGetTableMap() throws InterruptedException
{ {
SegmentMetadataCache schema = buildSchema1(); SegmentMetadataCache schema = buildSchemaMarkAndTableLatch();
Assert.assertEquals(ImmutableSet.of("foo", "foo2"), schema.getDatasourceNames()); Assert.assertEquals(ImmutableSet.of(CalciteTests.DATASOURCE1, CalciteTests.DATASOURCE2, CalciteTests.SOME_DATASOURCE), schema.getDatasourceNames());
final Set<String> tableNames = schema.getDatasourceNames(); final Set<String> tableNames = schema.getDatasourceNames();
Assert.assertEquals(ImmutableSet.of("foo", "foo2"), tableNames); Assert.assertEquals(ImmutableSet.of(CalciteTests.DATASOURCE1, CalciteTests.DATASOURCE2, CalciteTests.SOME_DATASOURCE), tableNames);
} }
@Test @Test
public void testSchemaInit() throws InterruptedException public void testSchemaInit() throws InterruptedException
{ {
SegmentMetadataCache schema2 = buildSchema1(); SegmentMetadataCache schema2 = buildSchemaMarkAndTableLatch();
Assert.assertEquals(ImmutableSet.of("foo", "foo2"), schema2.getDatasourceNames()); Assert.assertEquals(ImmutableSet.of(CalciteTests.DATASOURCE1, CalciteTests.DATASOURCE2, CalciteTests.SOME_DATASOURCE), schema2.getDatasourceNames());
} }
@Test @Test
public void testGetTableMapFoo() throws InterruptedException public void testGetTableMapFoo() throws InterruptedException
{ {
SegmentMetadataCache schema = buildSchema1(); SegmentMetadataCache schema = buildSchemaMarkAndTableLatch();
final DatasourceTable.PhysicalDatasourceMetadata fooDs = schema.getDatasource("foo"); final DatasourceTable.PhysicalDatasourceMetadata fooDs = schema.getDatasource("foo");
final DruidTable fooTable = new DatasourceTable(fooDs); final DruidTable fooTable = new DatasourceTable(fooDs);
final RelDataType rowType = fooTable.getRowType(new JavaTypeFactoryImpl()); final RelDataType rowType = fooTable.getRowType(new JavaTypeFactoryImpl());
@ -354,7 +404,7 @@ public class SegmentMetadataCacheTest extends SegmentMetadataCacheCommon
Assert.assertEquals(SqlTypeName.VARCHAR, fields.get(1).getType().getSqlTypeName()); Assert.assertEquals(SqlTypeName.VARCHAR, fields.get(1).getType().getSqlTypeName());
Assert.assertEquals("m1", fields.get(2).getName()); Assert.assertEquals("m1", fields.get(2).getName());
Assert.assertEquals(SqlTypeName.BIGINT, fields.get(2).getType().getSqlTypeName()); Assert.assertEquals(SqlTypeName.DOUBLE, fields.get(2).getType().getSqlTypeName());
Assert.assertEquals("dim1", fields.get(3).getName()); Assert.assertEquals("dim1", fields.get(3).getName());
Assert.assertEquals(SqlTypeName.VARCHAR, fields.get(3).getType().getSqlTypeName()); Assert.assertEquals(SqlTypeName.VARCHAR, fields.get(3).getType().getSqlTypeName());
@ -369,7 +419,7 @@ public class SegmentMetadataCacheTest extends SegmentMetadataCacheCommon
@Test @Test
public void testGetTableMapFoo2() throws InterruptedException public void testGetTableMapFoo2() throws InterruptedException
{ {
SegmentMetadataCache schema = buildSchema1(); SegmentMetadataCache schema = buildSchemaMarkAndTableLatch();
final DatasourceTable.PhysicalDatasourceMetadata fooDs = schema.getDatasource("foo2"); final DatasourceTable.PhysicalDatasourceMetadata fooDs = schema.getDatasource("foo2");
final DruidTable fooTable = new DatasourceTable(fooDs); final DruidTable fooTable = new DatasourceTable(fooDs);
final RelDataType rowType = fooTable.getRowType(new JavaTypeFactoryImpl()); final RelDataType rowType = fooTable.getRowType(new JavaTypeFactoryImpl());
@ -387,6 +437,103 @@ public class SegmentMetadataCacheTest extends SegmentMetadataCacheCommon
Assert.assertEquals(SqlTypeName.BIGINT, fields.get(2).getType().getSqlTypeName()); Assert.assertEquals(SqlTypeName.BIGINT, fields.get(2).getType().getSqlTypeName());
} }
@Test
public void testGetTableMapSomeTable() throws InterruptedException
{
// using 'newest first' column type merge strategy, the types are expected to be the types defined in the newer
// segment, except for json, which is special handled
SegmentMetadataCache schema = buildSchemaMarkAndTableLatch(
new SegmentMetadataCacheConfig() {
@Override
public SegmentMetadataCache.ColumnTypeMergePolicy getMetadataColumnTypeMergePolicy()
{
return new SegmentMetadataCache.FirstTypeMergePolicy();
}
}
);
final DatasourceTable.PhysicalDatasourceMetadata fooDs = schema.getDatasource(CalciteTests.SOME_DATASOURCE);
final DruidTable table = new DatasourceTable(fooDs);
final RelDataType rowType = table.getRowType(new JavaTypeFactoryImpl());
final List<RelDataTypeField> fields = rowType.getFieldList();
Assert.assertEquals(9, fields.size());
Assert.assertEquals("__time", fields.get(0).getName());
Assert.assertEquals(SqlTypeName.TIMESTAMP, fields.get(0).getType().getSqlTypeName());
Assert.assertEquals("numbery", fields.get(1).getName());
Assert.assertEquals(SqlTypeName.BIGINT, fields.get(1).getType().getSqlTypeName());
Assert.assertEquals("numberyArrays", fields.get(2).getName());
Assert.assertEquals(SqlTypeName.ARRAY, fields.get(2).getType().getSqlTypeName());
Assert.assertEquals(SqlTypeName.DOUBLE, fields.get(2).getType().getComponentType().getSqlTypeName());
Assert.assertEquals("stringy", fields.get(3).getName());
Assert.assertEquals(SqlTypeName.VARCHAR, fields.get(3).getType().getSqlTypeName());
Assert.assertEquals("array", fields.get(4).getName());
Assert.assertEquals(SqlTypeName.ARRAY, fields.get(4).getType().getSqlTypeName());
Assert.assertEquals(SqlTypeName.BIGINT, fields.get(4).getType().getComponentType().getSqlTypeName());
Assert.assertEquals("nested", fields.get(5).getName());
Assert.assertEquals(SqlTypeName.OTHER, fields.get(5).getType().getSqlTypeName());
Assert.assertEquals("cnt", fields.get(6).getName());
Assert.assertEquals(SqlTypeName.BIGINT, fields.get(6).getType().getSqlTypeName());
Assert.assertEquals("m1", fields.get(7).getName());
Assert.assertEquals(SqlTypeName.DOUBLE, fields.get(7).getType().getSqlTypeName());
Assert.assertEquals("unique_dim1", fields.get(8).getName());
Assert.assertEquals(SqlTypeName.OTHER, fields.get(8).getType().getSqlTypeName());
}
@Test
public void testGetTableMapSomeTableLeastRestrictiveTypeMerge() throws InterruptedException
{
// using 'least restrictive' column type merge strategy, the types are expected to be the types defined as the
// least restrictive blend across all segments
SegmentMetadataCache schema = buildSchemaMarkAndTableLatch();
final DatasourceTable.PhysicalDatasourceMetadata fooDs = schema.getDatasource(CalciteTests.SOME_DATASOURCE);
final DruidTable table = new DatasourceTable(fooDs);
final RelDataType rowType = table.getRowType(new JavaTypeFactoryImpl());
final List<RelDataTypeField> fields = rowType.getFieldList();
Assert.assertEquals(9, fields.size());
Assert.assertEquals("__time", fields.get(0).getName());
Assert.assertEquals(SqlTypeName.TIMESTAMP, fields.get(0).getType().getSqlTypeName());
Assert.assertEquals("numbery", fields.get(1).getName());
Assert.assertEquals(SqlTypeName.DOUBLE, fields.get(1).getType().getSqlTypeName());
Assert.assertEquals("numberyArrays", fields.get(2).getName());
Assert.assertEquals(SqlTypeName.ARRAY, fields.get(2).getType().getSqlTypeName());
Assert.assertEquals(SqlTypeName.DOUBLE, fields.get(2).getType().getComponentType().getSqlTypeName());
Assert.assertEquals("stringy", fields.get(3).getName());
Assert.assertEquals(SqlTypeName.ARRAY, fields.get(3).getType().getSqlTypeName());
Assert.assertEquals(SqlTypeName.VARCHAR, fields.get(3).getType().getComponentType().getSqlTypeName());
Assert.assertEquals("array", fields.get(4).getName());
Assert.assertEquals(SqlTypeName.ARRAY, fields.get(4).getType().getSqlTypeName());
Assert.assertEquals(SqlTypeName.DOUBLE, fields.get(4).getType().getComponentType().getSqlTypeName());
Assert.assertEquals("nested", fields.get(5).getName());
Assert.assertEquals(SqlTypeName.OTHER, fields.get(5).getType().getSqlTypeName());
Assert.assertEquals("cnt", fields.get(6).getName());
Assert.assertEquals(SqlTypeName.BIGINT, fields.get(6).getType().getSqlTypeName());
Assert.assertEquals("m1", fields.get(7).getName());
Assert.assertEquals(SqlTypeName.DOUBLE, fields.get(7).getType().getSqlTypeName());
Assert.assertEquals("unique_dim1", fields.get(8).getName());
Assert.assertEquals(SqlTypeName.OTHER, fields.get(8).getType().getSqlTypeName());
}
/** /**
* This tests that {@link AvailableSegmentMetadata#getNumRows()} is correct in case * This tests that {@link AvailableSegmentMetadata#getNumRows()} is correct in case
* of multiple replicas i.e. when {@link SegmentMetadataCache#addSegment(DruidServerMetadata, DataSegment)} * of multiple replicas i.e. when {@link SegmentMetadataCache#addSegment(DruidServerMetadata, DataSegment)}
@ -396,13 +543,13 @@ public class SegmentMetadataCacheTest extends SegmentMetadataCacheCommon
@Test @Test
public void testAvailableSegmentMetadataNumRows() throws InterruptedException public void testAvailableSegmentMetadataNumRows() throws InterruptedException
{ {
SegmentMetadataCache schema = buildSchema1(); SegmentMetadataCache schema = buildSchemaMarkAndTableLatch();
Map<SegmentId, AvailableSegmentMetadata> segmentsMetadata = schema.getSegmentMetadataSnapshot(); Map<SegmentId, AvailableSegmentMetadata> segmentsMetadata = schema.getSegmentMetadataSnapshot();
final List<DataSegment> segments = segmentsMetadata.values() final List<DataSegment> segments = segmentsMetadata.values()
.stream() .stream()
.map(AvailableSegmentMetadata::getSegment) .map(AvailableSegmentMetadata::getSegment)
.collect(Collectors.toList()); .collect(Collectors.toList());
Assert.assertEquals(4, segments.size()); Assert.assertEquals(6, segments.size());
// find the only segment with datasource "foo2" // find the only segment with datasource "foo2"
final DataSegment existingSegment = segments.stream() final DataSegment existingSegment = segments.stream()
.filter(segment -> segment.getDataSource().equals("foo2")) .filter(segment -> segment.getDataSource().equals("foo2"))
@ -446,13 +593,13 @@ public class SegmentMetadataCacheTest extends SegmentMetadataCacheCommon
@Test @Test
public void testNullDatasource() throws IOException, InterruptedException public void testNullDatasource() throws IOException, InterruptedException
{ {
SegmentMetadataCache schema = buildSchema1(); SegmentMetadataCache schema = buildSchemaMarkAndTableLatch();
final Map<SegmentId, AvailableSegmentMetadata> segmentMetadatas = schema.getSegmentMetadataSnapshot(); final Map<SegmentId, AvailableSegmentMetadata> segmentMetadatas = schema.getSegmentMetadataSnapshot();
final List<DataSegment> segments = segmentMetadatas.values() final List<DataSegment> segments = segmentMetadatas.values()
.stream() .stream()
.map(AvailableSegmentMetadata::getSegment) .map(AvailableSegmentMetadata::getSegment)
.collect(Collectors.toList()); .collect(Collectors.toList());
Assert.assertEquals(4, segments.size()); Assert.assertEquals(6, segments.size());
// segments contains two segments with datasource "foo" and one with datasource "foo2" // segments contains two segments with datasource "foo" and one with datasource "foo2"
// let's remove the only segment with datasource "foo2" // let's remove the only segment with datasource "foo2"
final DataSegment segmentToRemove = segments.stream() final DataSegment segmentToRemove = segments.stream()
@ -465,19 +612,19 @@ public class SegmentMetadataCacheTest extends SegmentMetadataCacheCommon
// The following line can cause NPE without segmentMetadata null check in // The following line can cause NPE without segmentMetadata null check in
// SegmentMetadataCache#refreshSegmentsForDataSource // SegmentMetadataCache#refreshSegmentsForDataSource
schema.refreshSegments(segments.stream().map(DataSegment::getId).collect(Collectors.toSet())); schema.refreshSegments(segments.stream().map(DataSegment::getId).collect(Collectors.toSet()));
Assert.assertEquals(3, schema.getSegmentMetadataSnapshot().size()); Assert.assertEquals(5, schema.getSegmentMetadataSnapshot().size());
} }
@Test @Test
public void testNullAvailableSegmentMetadata() throws IOException, InterruptedException public void testNullAvailableSegmentMetadata() throws IOException, InterruptedException
{ {
SegmentMetadataCache schema = buildSchema1(); SegmentMetadataCache schema = buildSchemaMarkAndTableLatch();
final Map<SegmentId, AvailableSegmentMetadata> segmentMetadatas = schema.getSegmentMetadataSnapshot(); final Map<SegmentId, AvailableSegmentMetadata> segmentMetadatas = schema.getSegmentMetadataSnapshot();
final List<DataSegment> segments = segmentMetadatas.values() final List<DataSegment> segments = segmentMetadatas.values()
.stream() .stream()
.map(AvailableSegmentMetadata::getSegment) .map(AvailableSegmentMetadata::getSegment)
.collect(Collectors.toList()); .collect(Collectors.toList());
Assert.assertEquals(4, segments.size()); Assert.assertEquals(6, segments.size());
// remove one of the segments with datasource "foo" // remove one of the segments with datasource "foo"
final DataSegment segmentToRemove = segments.stream() final DataSegment segmentToRemove = segments.stream()
.filter(segment -> segment.getDataSource().equals("foo")) .filter(segment -> segment.getDataSource().equals("foo"))
@ -489,13 +636,13 @@ public class SegmentMetadataCacheTest extends SegmentMetadataCacheCommon
// The following line can cause NPE without segmentMetadata null check in // The following line can cause NPE without segmentMetadata null check in
// SegmentMetadataCache#refreshSegmentsForDataSource // SegmentMetadataCache#refreshSegmentsForDataSource
schema.refreshSegments(segments.stream().map(DataSegment::getId).collect(Collectors.toSet())); schema.refreshSegments(segments.stream().map(DataSegment::getId).collect(Collectors.toSet()));
Assert.assertEquals(3, schema.getSegmentMetadataSnapshot().size()); Assert.assertEquals(5, schema.getSegmentMetadataSnapshot().size());
} }
@Test @Test
public void testAvailableSegmentMetadataIsRealtime() throws InterruptedException public void testAvailableSegmentMetadataIsRealtime() throws InterruptedException
{ {
SegmentMetadataCache schema = buildSchema1(); SegmentMetadataCache schema = buildSchemaMarkAndTableLatch();
Map<SegmentId, AvailableSegmentMetadata> segmentsMetadata = schema.getSegmentMetadataSnapshot(); Map<SegmentId, AvailableSegmentMetadata> segmentsMetadata = schema.getSegmentMetadataSnapshot();
final List<DataSegment> segments = segmentsMetadata.values() final List<DataSegment> segments = segmentsMetadata.values()
.stream() .stream()
@ -576,7 +723,7 @@ public class SegmentMetadataCacheTest extends SegmentMetadataCacheCommon
serverView.addSegment(newSegment(datasource, 1), ServerType.HISTORICAL); serverView.addSegment(newSegment(datasource, 1), ServerType.HISTORICAL);
Assert.assertTrue(addSegmentLatch.await(1, TimeUnit.SECONDS)); Assert.assertTrue(addSegmentLatch.await(1, TimeUnit.SECONDS));
Assert.assertEquals(5, schema.getTotalSegments()); Assert.assertEquals(7, schema.getTotalSegments());
List<AvailableSegmentMetadata> metadatas = schema List<AvailableSegmentMetadata> metadatas = schema
.getSegmentMetadataSnapshot() .getSegmentMetadataSnapshot()
.values() .values()
@ -621,7 +768,7 @@ public class SegmentMetadataCacheTest extends SegmentMetadataCacheCommon
serverView.addSegment(segment, ServerType.HISTORICAL); serverView.addSegment(segment, ServerType.HISTORICAL);
Assert.assertTrue(addSegmentLatch.await(1, TimeUnit.SECONDS)); Assert.assertTrue(addSegmentLatch.await(1, TimeUnit.SECONDS));
Assert.assertEquals(5, schema.getTotalSegments()); Assert.assertEquals(7, schema.getTotalSegments());
List<AvailableSegmentMetadata> metadatas = schema List<AvailableSegmentMetadata> metadatas = schema
.getSegmentMetadataSnapshot() .getSegmentMetadataSnapshot()
.values() .values()
@ -666,7 +813,7 @@ public class SegmentMetadataCacheTest extends SegmentMetadataCacheCommon
serverView.addSegment(newSegment(datasource, 1), ServerType.REALTIME); serverView.addSegment(newSegment(datasource, 1), ServerType.REALTIME);
Assert.assertTrue(addSegmentLatch.await(1, TimeUnit.SECONDS)); Assert.assertTrue(addSegmentLatch.await(1, TimeUnit.SECONDS));
Assert.assertEquals(5, schema.getTotalSegments()); Assert.assertEquals(7, schema.getTotalSegments());
List<AvailableSegmentMetadata> metadatas = schema List<AvailableSegmentMetadata> metadatas = schema
.getSegmentMetadataSnapshot() .getSegmentMetadataSnapshot()
.values() .values()
@ -710,7 +857,7 @@ public class SegmentMetadataCacheTest extends SegmentMetadataCacheCommon
serverView.addSegment(newSegment(datasource, 1), ServerType.BROKER); serverView.addSegment(newSegment(datasource, 1), ServerType.BROKER);
Assert.assertTrue(addSegmentLatch.await(1, TimeUnit.SECONDS)); Assert.assertTrue(addSegmentLatch.await(1, TimeUnit.SECONDS));
Assert.assertEquals(4, schema.getTotalSegments()); Assert.assertEquals(6, schema.getTotalSegments());
List<AvailableSegmentMetadata> metadatas = schema List<AvailableSegmentMetadata> metadatas = schema
.getSegmentMetadataSnapshot() .getSegmentMetadataSnapshot()
.values() .values()
@ -765,7 +912,7 @@ public class SegmentMetadataCacheTest extends SegmentMetadataCacheCommon
serverView.removeSegment(segment, ServerType.REALTIME); serverView.removeSegment(segment, ServerType.REALTIME);
Assert.assertTrue(removeSegmentLatch.await(1, TimeUnit.SECONDS)); Assert.assertTrue(removeSegmentLatch.await(1, TimeUnit.SECONDS));
Assert.assertEquals(4, schema.getTotalSegments()); Assert.assertEquals(6, schema.getTotalSegments());
List<AvailableSegmentMetadata> metadatas = schema List<AvailableSegmentMetadata> metadatas = schema
.getSegmentMetadataSnapshot() .getSegmentMetadataSnapshot()
.values() .values()
@ -827,7 +974,7 @@ public class SegmentMetadataCacheTest extends SegmentMetadataCacheCommon
serverView.removeSegment(segments.get(0), ServerType.REALTIME); serverView.removeSegment(segments.get(0), ServerType.REALTIME);
Assert.assertTrue(removeSegmentLatch.await(1, TimeUnit.SECONDS)); Assert.assertTrue(removeSegmentLatch.await(1, TimeUnit.SECONDS));
Assert.assertEquals(5, schema.getTotalSegments()); Assert.assertEquals(7, schema.getTotalSegments());
List<AvailableSegmentMetadata> metadatas = schema List<AvailableSegmentMetadata> metadatas = schema
.getSegmentMetadataSnapshot() .getSegmentMetadataSnapshot()
.values() .values()
@ -872,7 +1019,7 @@ public class SegmentMetadataCacheTest extends SegmentMetadataCacheCommon
serverView.removeSegment(newSegment(datasource, 1), ServerType.HISTORICAL); serverView.removeSegment(newSegment(datasource, 1), ServerType.HISTORICAL);
Assert.assertTrue(removeServerSegmentLatch.await(1, TimeUnit.SECONDS)); Assert.assertTrue(removeServerSegmentLatch.await(1, TimeUnit.SECONDS));
Assert.assertEquals(4, schema.getTotalSegments()); Assert.assertEquals(6, schema.getTotalSegments());
} }
@Test @Test
@ -919,7 +1066,7 @@ public class SegmentMetadataCacheTest extends SegmentMetadataCacheCommon
serverView.removeSegment(segment, ServerType.BROKER); serverView.removeSegment(segment, ServerType.BROKER);
Assert.assertTrue(removeServerSegmentLatch.await(1, TimeUnit.SECONDS)); Assert.assertTrue(removeServerSegmentLatch.await(1, TimeUnit.SECONDS));
Assert.assertEquals(5, schema.getTotalSegments()); Assert.assertEquals(7, schema.getTotalSegments());
Assert.assertTrue(schema.getDataSourcesNeedingRebuild().contains(datasource)); Assert.assertTrue(schema.getDataSourcesNeedingRebuild().contains(datasource));
} }
@ -967,7 +1114,7 @@ public class SegmentMetadataCacheTest extends SegmentMetadataCacheCommon
serverView.removeSegment(segment, ServerType.HISTORICAL); serverView.removeSegment(segment, ServerType.HISTORICAL);
Assert.assertTrue(removeServerSegmentLatch.await(1, TimeUnit.SECONDS)); Assert.assertTrue(removeServerSegmentLatch.await(1, TimeUnit.SECONDS));
Assert.assertEquals(5, schema.getTotalSegments()); Assert.assertEquals(7, schema.getTotalSegments());
List<AvailableSegmentMetadata> metadatas = schema List<AvailableSegmentMetadata> metadatas = schema
.getSegmentMetadataSnapshot() .getSegmentMetadataSnapshot()
.values() .values()
@ -995,7 +1142,7 @@ public class SegmentMetadataCacheTest extends SegmentMetadataCacheCommon
@Test @Test
public void testLocalSegmentCacheSetsDataSourceAsGlobalAndJoinable() throws InterruptedException public void testLocalSegmentCacheSetsDataSourceAsGlobalAndJoinable() throws InterruptedException
{ {
SegmentMetadataCache schema3 = buildSchema3(); SegmentMetadataCache schema3 = buildSchemaMarkAndRefreshLatch();
Assert.assertTrue(refreshLatch.await(WAIT_TIMEOUT_SECS, TimeUnit.SECONDS)); Assert.assertTrue(refreshLatch.await(WAIT_TIMEOUT_SECS, TimeUnit.SECONDS));
DatasourceTable.PhysicalDatasourceMetadata fooTable = schema3.getDatasource("foo"); DatasourceTable.PhysicalDatasourceMetadata fooTable = schema3.getDatasource("foo");
Assert.assertNotNull(fooTable); Assert.assertNotNull(fooTable);
@ -1061,7 +1208,7 @@ public class SegmentMetadataCacheTest extends SegmentMetadataCacheCommon
@Test @Test
public void testLocalSegmentCacheSetsDataSourceAsBroadcastButNotJoinable() throws InterruptedException public void testLocalSegmentCacheSetsDataSourceAsBroadcastButNotJoinable() throws InterruptedException
{ {
SegmentMetadataCache schema = buildSchema3(); SegmentMetadataCache schema = buildSchemaMarkAndRefreshLatch();
Assert.assertTrue(refreshLatch.await(WAIT_TIMEOUT_SECS, TimeUnit.SECONDS)); Assert.assertTrue(refreshLatch.await(WAIT_TIMEOUT_SECS, TimeUnit.SECONDS));
DatasourceTable.PhysicalDatasourceMetadata fooTable = schema.getDatasource("foo"); DatasourceTable.PhysicalDatasourceMetadata fooTable = schema.getDatasource("foo");
Assert.assertNotNull(fooTable); Assert.assertNotNull(fooTable);
@ -1291,7 +1438,7 @@ public class SegmentMetadataCacheTest extends SegmentMetadataCacheCommon
@Test @Test
public void testStaleDatasourceRefresh() throws IOException, InterruptedException public void testStaleDatasourceRefresh() throws IOException, InterruptedException
{ {
SegmentMetadataCache schema = buildSchema1(); SegmentMetadataCache schema = buildSchemaMarkAndTableLatch();
Set<SegmentId> segments = new HashSet<>(); Set<SegmentId> segments = new HashSet<>();
Set<String> datasources = new HashSet<>(); Set<String> datasources = new HashSet<>();
datasources.add("wat"); datasources.add("wat");