diff --git a/docs/configuration/index.md b/docs/configuration/index.md index abf21428fe0..a367c6b597f 100644 --- a/docs/configuration/index.md +++ b/docs/configuration/index.md @@ -1953,6 +1953,7 @@ The Druid SQL server is configured through the following properties on the Broke |`druid.sql.http.enable`|Whether to enable JSON over HTTP querying at `/druid/v2/sql/`.|true| |`druid.sql.planner.maxTopNLimit`|Maximum threshold for a [TopN query](../querying/topnquery.md). Higher limits will be planned as [GroupBy queries](../querying/groupbyquery.md) instead.|100000| |`druid.sql.planner.metadataRefreshPeriod`|Throttle for metadata refreshes.|PT1M| +|`druid.sql.planner.metadataColumnTypeMergePolicy`|Defines how column types will be chosen when faced with differences between segments when computing the SQL schema. Options are specified as a JSON object, with valid choices of `leastRestrictive` or `latestInterval`. For `leastRestrictive`, Druid will automatically widen the type computed for the schema to a type which data across all segments can be converted into, however planned schema migrations can only take effect once all segments have been re-ingested to the new schema. With `latestInterval`, the column type in most recent time chunks defines the type for the schema. |`leastRestrictive`| |`druid.sql.planner.useApproximateCountDistinct`|Whether to use an approximate cardinality algorithm for `COUNT(DISTINCT foo)`.|true| |`druid.sql.planner.useGroupingSetForExactDistinct`|Only relevant when `useApproximateCountDistinct` is disabled. If set to true, exact distinct queries are re-written using grouping sets. Otherwise, exact distinct queries are re-written using joins. This should be set to true for group by query with multiple exact distinct aggregations. This flag can be overridden per query.|false| |`druid.sql.planner.useApproximateTopN`|Whether to use approximate [TopN queries](../querying/topnquery.md) when a SQL query could be expressed as such. If false, exact [GroupBy queries](../querying/groupbyquery.md) will be used instead.|true| diff --git a/processing/src/main/java/org/apache/druid/math/expr/ExpressionTypeConversion.java b/processing/src/main/java/org/apache/druid/math/expr/ExpressionTypeConversion.java index 584a77ed368..1221fa55009 100644 --- a/processing/src/main/java/org/apache/druid/math/expr/ExpressionTypeConversion.java +++ b/processing/src/main/java/org/apache/druid/math/expr/ExpressionTypeConversion.java @@ -19,7 +19,6 @@ package org.apache.druid.math.expr; -import org.apache.druid.java.util.common.IAE; import org.apache.druid.segment.column.Types; import javax.annotation.Nullable; @@ -83,7 +82,7 @@ public class ExpressionTypeConversion } if (type.isArray() || other.isArray()) { if (!Objects.equals(type, other)) { - throw new IAE("Cannot implicitly cast %s to %s", type, other); + throw new Types.IncompatibleTypeException(type, other); } return type; } @@ -95,7 +94,7 @@ public class ExpressionTypeConversion return type; } if (!Objects.equals(type, other)) { - throw new IAE("Cannot implicitly cast %s to %s", type, other); + throw new Types.IncompatibleTypeException(type, other); } return type; } @@ -128,7 +127,7 @@ public class ExpressionTypeConversion // arrays cannot be auto converted if (type.isArray() || other.isArray()) { if (!Objects.equals(type, other)) { - throw new IAE("Cannot implicitly cast %s to %s", type, other); + throw new Types.IncompatibleTypeException(type, other); } return type; } @@ -140,7 +139,7 @@ public class ExpressionTypeConversion return type; } if (!Objects.equals(type, other)) { - throw new IAE("Cannot implicitly cast %s to %s", type, other); + throw new Types.IncompatibleTypeException(type, other); } return type; } @@ -177,7 +176,7 @@ public class ExpressionTypeConversion return ExpressionTypeFactory.getInstance().ofArray(newElementType); } - throw new IAE("Cannot implicitly cast %s to %s", type, other); + throw new Types.IncompatibleTypeException(type, other); } diff --git a/processing/src/main/java/org/apache/druid/segment/NestedDataColumnMerger.java b/processing/src/main/java/org/apache/druid/segment/NestedDataColumnMerger.java index f1a4ea7bc09..c3a3047e0fd 100644 --- a/processing/src/main/java/org/apache/druid/segment/NestedDataColumnMerger.java +++ b/processing/src/main/java/org/apache/druid/segment/NestedDataColumnMerger.java @@ -93,6 +93,9 @@ public class NestedDataColumnMerger implements DimensionMergerV9 final IndexableAdapter.NestedColumnMergable mergable = closer.register( adapter.getNestedColumnMergeables(name) ); + if (mergable == null) { + continue; + } final SortedValueDictionary dimValues = mergable.getValueDictionary(); boolean allNulls = dimValues == null || dimValues.allNull(); diff --git a/processing/src/main/java/org/apache/druid/segment/column/ColumnType.java b/processing/src/main/java/org/apache/druid/segment/column/ColumnType.java index 9891bac9b4e..dbfed07749c 100644 --- a/processing/src/main/java/org/apache/druid/segment/column/ColumnType.java +++ b/processing/src/main/java/org/apache/druid/segment/column/ColumnType.java @@ -23,7 +23,6 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.databind.annotation.JsonSerialize; import com.fasterxml.jackson.databind.ser.std.ToStringSerializer; -import org.apache.druid.java.util.common.IAE; import org.apache.druid.segment.nested.NestedDataComplexTypeSerde; import javax.annotation.Nullable; @@ -152,7 +151,7 @@ public class ColumnType extends BaseTypeSignature * inference */ @Nullable - public static ColumnType leastRestrictiveType(@Nullable ColumnType type, @Nullable ColumnType other) + public static ColumnType leastRestrictiveType(@Nullable ColumnType type, @Nullable ColumnType other) throws Types.IncompatibleTypeException { if (type == null) { return other; @@ -168,7 +167,7 @@ public class ColumnType extends BaseTypeSignature return type; } if (!Objects.equals(type, other)) { - throw new IAE("Cannot implicitly cast %s to %s", type, other); + throw new Types.IncompatibleTypeException(type, other); } return type; } @@ -177,7 +176,7 @@ public class ColumnType extends BaseTypeSignature if (ColumnType.NESTED_DATA.equals(type) || ColumnType.NESTED_DATA.equals(other)) { return ColumnType.NESTED_DATA; } - throw new IAE("Cannot implicitly cast %s to %s", type, other); + throw new Types.IncompatibleTypeException(type, other); } // arrays convert based on least restrictive element type @@ -186,11 +185,13 @@ public class ColumnType extends BaseTypeSignature return type; } final ColumnType commonElementType; + // commonElementType cannot be null if we got this far, we always return a value unless both args are null if (other.isArray()) { commonElementType = leastRestrictiveType( (ColumnType) type.getElementType(), (ColumnType) other.getElementType() ); + return ColumnType.ofArray(commonElementType); } else { commonElementType = leastRestrictiveType( @@ -218,13 +219,14 @@ public class ColumnType extends BaseTypeSignature } // all numbers win over longs - // floats vs doubles would be handled here, but we currently only support doubles... if (Types.is(type, ValueType.LONG) && Types.isNullOr(other, ValueType.LONG)) { return ColumnType.LONG; } + // doubles win over floats if (Types.is(type, ValueType.FLOAT) && Types.isNullOr(other, ValueType.FLOAT)) { return ColumnType.FLOAT; } return ColumnType.DOUBLE; } + } diff --git a/processing/src/main/java/org/apache/druid/segment/column/Types.java b/processing/src/main/java/org/apache/druid/segment/column/Types.java index 44c7b13720e..02da5c8bfa7 100644 --- a/processing/src/main/java/org/apache/druid/segment/column/Types.java +++ b/processing/src/main/java/org/apache/druid/segment/column/Types.java @@ -20,6 +20,7 @@ package org.apache.druid.segment.column; import com.google.common.base.Preconditions; +import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.StringUtils; import javax.annotation.Nullable; @@ -112,4 +113,12 @@ public class Types return (typeSignature1 != null && typeSignature1.is(typeDescriptor)) || (typeSignature2 != null && typeSignature2.is(typeDescriptor)); } + + public static class IncompatibleTypeException extends IAE + { + public IncompatibleTypeException(TypeSignature type, TypeSignature other) + { + super("Cannot implicitly cast [%s] to [%s]", type, other); + } + } } diff --git a/processing/src/test/java/org/apache/druid/segment/column/ColumnTypeTest.java b/processing/src/test/java/org/apache/druid/segment/column/ColumnTypeTest.java index 89f0aeb80f3..a66a9ad2d61 100644 --- a/processing/src/test/java/org/apache/druid/segment/column/ColumnTypeTest.java +++ b/processing/src/test/java/org/apache/druid/segment/column/ColumnTypeTest.java @@ -163,8 +163,8 @@ public class ColumnTypeTest Assert.assertEquals(ColumnType.NESTED_DATA, ColumnType.leastRestrictiveType(ColumnType.NESTED_DATA, ColumnType.UNKNOWN_COMPLEX)); Assert.assertEquals(SOME_COMPLEX, ColumnType.leastRestrictiveType(SOME_COMPLEX, ColumnType.UNKNOWN_COMPLEX)); Assert.assertEquals(SOME_COMPLEX, ColumnType.leastRestrictiveType(ColumnType.UNKNOWN_COMPLEX, SOME_COMPLEX)); - Assert.assertThrows(IllegalArgumentException.class, () -> ColumnType.leastRestrictiveType(ColumnType.NESTED_DATA, SOME_COMPLEX)); - Assert.assertThrows(IllegalArgumentException.class, () -> ColumnType.leastRestrictiveType(ColumnType.STRING_ARRAY, SOME_COMPLEX)); + Assert.assertThrows(Types.IncompatibleTypeException.class, () -> ColumnType.leastRestrictiveType(ColumnType.NESTED_DATA, SOME_COMPLEX)); + Assert.assertThrows(Types.IncompatibleTypeException.class, () -> ColumnType.leastRestrictiveType(ColumnType.STRING_ARRAY, SOME_COMPLEX)); } static class SomeOtherTypeSignature extends BaseTypeSignature diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/planner/CalcitePlannerModule.java b/sql/src/main/java/org/apache/druid/sql/calcite/planner/CalcitePlannerModule.java index 3b8e8b7ec08..484de5c4659 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/planner/CalcitePlannerModule.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/planner/CalcitePlannerModule.java @@ -31,6 +31,8 @@ import org.apache.druid.sql.calcite.rule.ExtensionCalciteRuleProvider; */ public class CalcitePlannerModule implements Module { + public static final String CONFIG_BASE = "druid.sql.planner"; + @Override public void configure(Binder binder) { @@ -38,8 +40,8 @@ public class CalcitePlannerModule implements Module // so both configs are bound to the same property prefix. // It turns out that the order of the arguments above is misleading. // We're actually binding the class to the config prefix, not the other way around. - JsonConfigProvider.bind(binder, "druid.sql.planner", PlannerConfig.class); - JsonConfigProvider.bind(binder, "druid.sql.planner", SegmentMetadataCacheConfig.class); + JsonConfigProvider.bind(binder, CONFIG_BASE, PlannerConfig.class); + JsonConfigProvider.bind(binder, CONFIG_BASE, SegmentMetadataCacheConfig.class); binder.bind(PlannerFactory.class).in(LazySingleton.class); binder.bind(DruidOperatorTable.class).in(LazySingleton.class); Multibinder.newSetBinder(binder, ExtensionCalciteRuleProvider.class); diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/planner/SegmentMetadataCacheConfig.java b/sql/src/main/java/org/apache/druid/sql/calcite/planner/SegmentMetadataCacheConfig.java index 17e342c4feb..dc4d94b78b3 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/planner/SegmentMetadataCacheConfig.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/planner/SegmentMetadataCacheConfig.java @@ -20,10 +20,9 @@ package org.apache.druid.sql.calcite.planner; import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.druid.sql.calcite.schema.SegmentMetadataCache; import org.joda.time.Period; -import java.util.Objects; - /** * Configuration properties for the Broker-side cache of segment metadata * used to infer datasources for SQL. This class shares the same config root @@ -32,6 +31,9 @@ import java.util.Objects; */ public class SegmentMetadataCacheConfig { + @JsonProperty + private boolean awaitInitializationOnStart = true; + @JsonProperty private boolean metadataSegmentCacheEnable = false; @@ -42,7 +44,8 @@ public class SegmentMetadataCacheConfig private Period metadataRefreshPeriod = new Period("PT1M"); @JsonProperty - private boolean awaitInitializationOnStart = true; + private SegmentMetadataCache.ColumnTypeMergePolicy metadataColumnTypeMergePolicy = + new SegmentMetadataCache.LeastRestrictiveTypeMergePolicy(); public static SegmentMetadataCacheConfig create() { @@ -78,31 +81,9 @@ public class SegmentMetadataCacheConfig return metadataSegmentPollPeriod; } - @Override - public boolean equals(final Object o) + public SegmentMetadataCache.ColumnTypeMergePolicy getMetadataColumnTypeMergePolicy() { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - final SegmentMetadataCacheConfig that = (SegmentMetadataCacheConfig) o; - return awaitInitializationOnStart == that.awaitInitializationOnStart && - metadataSegmentCacheEnable == that.metadataSegmentCacheEnable && - metadataSegmentPollPeriod == that.metadataSegmentPollPeriod && - Objects.equals(metadataRefreshPeriod, that.metadataRefreshPeriod); - } - - @Override - public int hashCode() - { - return Objects.hash( - metadataRefreshPeriod, - awaitInitializationOnStart, - metadataSegmentCacheEnable, - metadataSegmentPollPeriod - ); + return metadataColumnTypeMergePolicy; } @Override @@ -113,6 +94,7 @@ public class SegmentMetadataCacheConfig ", metadataSegmentCacheEnable=" + metadataSegmentCacheEnable + ", metadataSegmentPollPeriod=" + metadataSegmentPollPeriod + ", awaitInitializationOnStart=" + awaitInitializationOnStart + + ", metadataColumnTypeMergePolicy=" + metadataColumnTypeMergePolicy + '}'; } } diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SegmentMetadataCache.java b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SegmentMetadataCache.java index 8da4e647a97..71c19734ec0 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SegmentMetadataCache.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SegmentMetadataCache.java @@ -19,6 +19,7 @@ package org.apache.druid.sql.calcite.schema; +import com.fasterxml.jackson.annotation.JsonCreator; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.base.Predicates; @@ -36,6 +37,7 @@ import org.apache.druid.client.ServerView; import org.apache.druid.client.TimelineServerView; import org.apache.druid.guice.ManageLifecycle; import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.common.guava.Sequence; @@ -56,6 +58,7 @@ import org.apache.druid.query.metadata.metadata.SegmentMetadataQuery; import org.apache.druid.query.spec.MultipleSpecificSegmentSpec; import org.apache.druid.segment.column.ColumnType; import org.apache.druid.segment.column.RowSignature; +import org.apache.druid.segment.column.Types; import org.apache.druid.segment.join.JoinableFactory; import org.apache.druid.server.QueryLifecycleFactory; import org.apache.druid.server.SegmentManager; @@ -75,6 +78,7 @@ import java.util.EnumSet; import java.util.HashSet; import java.util.LinkedHashMap; import java.util.Map; +import java.util.Objects; import java.util.Optional; import java.util.Set; import java.util.TreeMap; @@ -107,7 +111,6 @@ public class SegmentMetadataCache private static final int MAX_SEGMENTS_PER_QUERY = 15000; private static final long DEFAULT_NUM_ROWS = 0; private static final Interner ROW_SIGNATURE_INTERNER = Interners.newWeakInterner(); - private final QueryLifecycleFactory queryLifecycleFactory; private final SegmentMetadataCacheConfig config; // Escalator, so we can attach an authentication result to queries we generate. @@ -117,6 +120,7 @@ public class SegmentMetadataCache private final ExecutorService cacheExec; private final ExecutorService callbackExec; private final ServiceEmitter emitter; + private final ColumnTypeMergePolicy columnTypeMergePolicy; /** * Map of DataSource -> DruidTable. @@ -229,6 +233,7 @@ public class SegmentMetadataCache this.segmentManager = segmentManager; this.joinableFactory = joinableFactory; this.config = Preconditions.checkNotNull(config, "config"); + this.columnTypeMergePolicy = config.getMetadataColumnTypeMergePolicy(); this.cacheExec = Execs.singleThreaded("DruidSchema-Cache-%d"); this.callbackExec = Execs.singleThreaded("DruidSchema-Callback-%d"); this.escalator = escalator; @@ -803,25 +808,11 @@ public class SegmentMetadataCache final RowSignature rowSignature = availableSegmentMetadata.getRowSignature(); if (rowSignature != null) { for (String column : rowSignature.getColumnNames()) { - // Newer column types should override older ones. final ColumnType columnType = rowSignature.getColumnType(column) .orElseThrow(() -> new ISE("Encountered null type for column [%s]", column)); - columnTypes.compute(column, (c, existingType) -> { - if (existingType == null) { - return columnType; - } - if (columnType == null) { - return existingType; - } - // if any are json, are all json - if (ColumnType.NESTED_DATA.equals(columnType) || ColumnType.NESTED_DATA.equals(existingType)) { - return ColumnType.NESTED_DATA; - } - // "existing type" is the 'newest' type, since we iterate the segments list by newest start time - return existingType; - }); + columnTypes.compute(column, (c, existingType) -> columnTypeMergePolicy.merge(existingType, columnType)); } } } @@ -995,4 +986,126 @@ public class SegmentMetadataCache runnable.run(); } } + + + /** + * ColumnTypeMergePolicy defines the rules of which type to use when faced with the possibility of different types + * for the same column from segment to segment. It is used to help compute a {@link RowSignature} for a table in + * Druid based on the segment metadata of all segments, merging the types of each column encountered to end up with + * a single type to represent it globally. + */ + @FunctionalInterface + public interface ColumnTypeMergePolicy + { + ColumnType merge(ColumnType existingType, ColumnType newType); + + @JsonCreator + static ColumnTypeMergePolicy fromString(String type) + { + if (LeastRestrictiveTypeMergePolicy.NAME.equalsIgnoreCase(type)) { + return LeastRestrictiveTypeMergePolicy.INSTANCE; + } + if (FirstTypeMergePolicy.NAME.equalsIgnoreCase(type)) { + return FirstTypeMergePolicy.INSTANCE; + } + throw new IAE("Unknown type [%s]", type); + } + } + + /** + * Classic logic, we use the first type we encounter. This policy is effectively 'newest first' because we iterated + * segments starting from the most recent time chunk, so this typically results in the most recently used type being + * chosen, at least for systems that are continuously updated with 'current' data. + * + * Since {@link ColumnTypeMergePolicy} are used to compute the SQL schema, at least in systems using SQL schemas which + * are partially or fully computed by this cache, this merge policy can result in query time errors if incompatible + * types are mixed if the chosen type is more restrictive than the types of some segments. If data is likely to vary + * in type across segments, consider using {@link LeastRestrictiveTypeMergePolicy} instead. + */ + public static class FirstTypeMergePolicy implements ColumnTypeMergePolicy + { + public static final String NAME = "latestInterval"; + private static final FirstTypeMergePolicy INSTANCE = new FirstTypeMergePolicy(); + + @Override + public ColumnType merge(ColumnType existingType, ColumnType newType) + { + if (existingType == null) { + return newType; + } + if (newType == null) { + return existingType; + } + // if any are json, are all json + if (ColumnType.NESTED_DATA.equals(newType) || ColumnType.NESTED_DATA.equals(existingType)) { + return ColumnType.NESTED_DATA; + } + // "existing type" is the 'newest' type, since we iterate the segments list by newest start time + return existingType; + } + + @Override + public int hashCode() + { + return Objects.hash(NAME); + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + return o != null && getClass() == o.getClass(); + } + + @Override + public String toString() + { + return NAME; + } + } + + /** + * Resolves types using {@link ColumnType#leastRestrictiveType(ColumnType, ColumnType)} to find the ColumnType that + * can best represent all data contained across all segments. + */ + public static class LeastRestrictiveTypeMergePolicy implements ColumnTypeMergePolicy + { + public static final String NAME = "leastRestrictive"; + private static final LeastRestrictiveTypeMergePolicy INSTANCE = new LeastRestrictiveTypeMergePolicy(); + + @Override + public ColumnType merge(ColumnType existingType, ColumnType newType) + { + try { + return ColumnType.leastRestrictiveType(existingType, newType); + } + catch (Types.IncompatibleTypeException incompatibleTypeException) { + // fall back to first encountered type if they are not compatible for some reason + return FirstTypeMergePolicy.INSTANCE.merge(existingType, newType); + } + } + + @Override + public int hashCode() + { + return Objects.hash(NAME); + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + return o != null && getClass() == o.getClass(); + } + + @Override + public String toString() + { + return NAME; + } + } } diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/planner/SegmentMetadataCacheConfigTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/planner/SegmentMetadataCacheConfigTest.java index 6afa5c77282..858f2e5d8ac 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/planner/SegmentMetadataCacheConfigTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/planner/SegmentMetadataCacheConfigTest.java @@ -19,12 +19,17 @@ package org.apache.druid.sql.calcite.planner; +import com.google.common.collect.ImmutableList; +import com.google.inject.Injector; +import org.apache.druid.guice.GuiceInjectors; +import org.apache.druid.guice.JsonConfigProvider; +import org.apache.druid.guice.JsonConfigurator; +import org.apache.druid.sql.calcite.schema.SegmentMetadataCache; import org.joda.time.Period; +import org.junit.Assert; import org.junit.Test; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; +import java.util.Properties; /** * Pathetic little unit test just to keep Jacoco happy. @@ -32,19 +37,60 @@ import static org.junit.Assert.assertTrue; public class SegmentMetadataCacheConfigTest { @Test - public void testConfig() + public void testDefaultConfig() { - SegmentMetadataCacheConfig config = SegmentMetadataCacheConfig.create("PT1M"); - assertEquals(Period.minutes(1), config.getMetadataRefreshPeriod()); - assertTrue(config.isAwaitInitializationOnStart()); - // Not legal per IntelliJ inspections. Should be testable, but IntelliJ - // won't allow this code. - //assertTrue(config.equals(config)); - // Workaround - assertTrue(config.equals(SegmentMetadataCacheConfig.create("PT1M"))); - assertFalse(config.equals(null)); - assertTrue(config.equals(SegmentMetadataCacheConfig.create("PT1M"))); - assertFalse(config.equals(SegmentMetadataCacheConfig.create("PT2M"))); - assertTrue(config.hashCode() != 0); + final Injector injector = createInjector(); + final JsonConfigProvider provider = JsonConfigProvider.of( + CalcitePlannerModule.CONFIG_BASE, + SegmentMetadataCacheConfig.class + ); + final Properties properties = new Properties(); + provider.inject(properties, injector.getInstance(JsonConfigurator.class)); + final SegmentMetadataCacheConfig config = provider.get(); + Assert.assertTrue(config.isAwaitInitializationOnStart()); + Assert.assertFalse(config.isMetadataSegmentCacheEnable()); + Assert.assertEquals(Period.minutes(1), config.getMetadataRefreshPeriod()); + Assert.assertEquals(60_000, config.getMetadataSegmentPollPeriod()); + Assert.assertEquals(new SegmentMetadataCache.LeastRestrictiveTypeMergePolicy(), config.getMetadataColumnTypeMergePolicy()); + } + + @Test + public void testCustomizedConfig() + { + final Injector injector = createInjector(); + final JsonConfigProvider provider = JsonConfigProvider.of( + CalcitePlannerModule.CONFIG_BASE, + SegmentMetadataCacheConfig.class + ); + final Properties properties = new Properties(); + properties.setProperty( + CalcitePlannerModule.CONFIG_BASE + ".metadataColumnTypeMergePolicy", + "latestInterval" + ); + properties.setProperty(CalcitePlannerModule.CONFIG_BASE + ".metadataRefreshPeriod", "PT2M"); + properties.setProperty(CalcitePlannerModule.CONFIG_BASE + ".metadataSegmentPollPeriod", "15000"); + properties.setProperty(CalcitePlannerModule.CONFIG_BASE + ".metadataSegmentCacheEnable", "true"); + properties.setProperty(CalcitePlannerModule.CONFIG_BASE + ".awaitInitializationOnStart", "false"); + provider.inject(properties, injector.getInstance(JsonConfigurator.class)); + final SegmentMetadataCacheConfig config = provider.get(); + Assert.assertFalse(config.isAwaitInitializationOnStart()); + Assert.assertTrue(config.isMetadataSegmentCacheEnable()); + Assert.assertEquals(Period.minutes(2), config.getMetadataRefreshPeriod()); + Assert.assertEquals(15_000, config.getMetadataSegmentPollPeriod()); + Assert.assertEquals( + new SegmentMetadataCache.FirstTypeMergePolicy(), + config.getMetadataColumnTypeMergePolicy() + ); + } + + private Injector createInjector() + { + return GuiceInjectors.makeStartupInjectorWithModules( + ImmutableList.of( + binder -> { + JsonConfigProvider.bind(binder, CalcitePlannerModule.CONFIG_BASE, SegmentMetadataCacheConfig.class); + } + ) + ); } } diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SegmentMetadataCacheTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SegmentMetadataCacheTest.java index 213ad453a6e..fb55d227109 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SegmentMetadataCacheTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SegmentMetadataCacheTest.java @@ -32,6 +32,10 @@ import org.apache.calcite.rel.type.RelDataTypeField; import org.apache.calcite.sql.type.SqlTypeName; import org.apache.druid.client.BrokerInternalQueryConfig; import org.apache.druid.client.ImmutableDruidServer; +import org.apache.druid.data.input.InputRow; +import org.apache.druid.data.input.InputRowSchema; +import org.apache.druid.data.input.impl.DimensionsSpec; +import org.apache.druid.data.input.impl.TimestampSpec; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.common.guava.Sequences; @@ -64,10 +68,12 @@ import org.apache.druid.server.metrics.NoopServiceEmitter; import org.apache.druid.server.security.Access; import org.apache.druid.server.security.AllowAllAuthenticator; import org.apache.druid.server.security.NoopEscalator; +import org.apache.druid.sql.calcite.planner.SegmentMetadataCacheConfig; import org.apache.druid.sql.calcite.table.DatasourceTable; import org.apache.druid.sql.calcite.table.DruidTable; import org.apache.druid.sql.calcite.util.CalciteTests; import org.apache.druid.sql.calcite.util.SpecificSegmentsQuerySegmentWalker; +import org.apache.druid.sql.calcite.util.TestDataBuilder; import org.apache.druid.sql.calcite.util.TestServerInventoryView; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.DataSegment.PruneSpecsHolder; @@ -137,6 +143,78 @@ public class SegmentMetadataCacheTest extends SegmentMetadataCacheCommon ) .rows(ROWS2) .buildMMappedIndex(); + + final InputRowSchema rowSchema = new InputRowSchema( + new TimestampSpec("t", null, null), + DimensionsSpec.builder().useSchemaDiscovery(true).build(), + null + ); + final List autoRows1 = ImmutableList.of( + TestDataBuilder.createRow( + ImmutableMap.builder() + .put("t", "2023-01-01T00:00Z") + .put("numbery", 1.1f) + .put("numberyArrays", ImmutableList.of(1L, 2L, 3L)) + .put("stringy", ImmutableList.of("a", "b", "c")) + .put("array", ImmutableList.of(1.1, 2.2, 3.3)) + .put("nested", ImmutableMap.of("x", 1L, "y", 2L)) + .build(), + rowSchema + ) + ); + final List autoRows2 = ImmutableList.of( + TestDataBuilder.createRow( + ImmutableMap.builder() + .put("t", "2023-01-02T00:00Z") + .put("numbery", 1L) + .put("numberyArrays", ImmutableList.of(3.3, 2.2, 3.1)) + .put("stringy", "a") + .put("array", ImmutableList.of(1L, 2L, 3L)) + .put("nested", "hello") + .build(), + rowSchema + ) + ); + final QueryableIndex indexAuto1 = IndexBuilder.create() + .tmpDir(new File(tmpDir, "1")) + .segmentWriteOutMediumFactory(OffHeapMemorySegmentWriteOutMediumFactory.instance()) + .schema( + new IncrementalIndexSchema.Builder() + .withTimestampSpec(rowSchema.getTimestampSpec()) + .withDimensionsSpec(rowSchema.getDimensionsSpec()) + .withMetrics( + new CountAggregatorFactory("cnt"), + new DoubleSumAggregatorFactory("m1", "m1"), + new HyperUniquesAggregatorFactory("unique_dim1", "dim1") + ) + .withRollup(false) + .build() + ) + .rows(autoRows1) + .buildMMappedIndex(); + + final QueryableIndex indexAuto2 = IndexBuilder.create() + .tmpDir(new File(tmpDir, "1")) + .segmentWriteOutMediumFactory(OffHeapMemorySegmentWriteOutMediumFactory.instance()) + .schema( + new IncrementalIndexSchema.Builder() + .withTimestampSpec( + new TimestampSpec("t", null, null) + ) + .withDimensionsSpec( + DimensionsSpec.builder().useSchemaDiscovery(true).build() + ) + .withMetrics( + new CountAggregatorFactory("cnt"), + new DoubleSumAggregatorFactory("m1", "m1"), + new HyperUniquesAggregatorFactory("unique_dim1", "dim1") + ) + .withRollup(false) + .build() + ) + .rows(autoRows2) + .buildMMappedIndex(); + walker = new SpecificSegmentsQuerySegmentWalker(conglomerate).add( DataSegment.builder() .dataSource(CalciteTests.DATASOURCE1) @@ -164,6 +242,24 @@ public class SegmentMetadataCacheTest extends SegmentMetadataCacheCommon .size(0) .build(), index2 + ).add( + DataSegment.builder() + .dataSource(CalciteTests.SOME_DATASOURCE) + .interval(Intervals.of("2023-01-01T00Z/P1D")) + .version("1") + .shardSpec(new LinearShardSpec(1)) + .size(0) + .build(), + indexAuto1 + ).add( + DataSegment.builder() + .dataSource(CalciteTests.SOME_DATASOURCE) + .interval(Intervals.of("2023-01-02T00Z/P1D")) + .version("1") + .shardSpec(new LinearShardSpec(1)) + .size(0) + .build(), + indexAuto2 ); final DataSegment segment1 = new DataSegment( "foo3", @@ -183,7 +279,12 @@ public class SegmentMetadataCacheTest extends SegmentMetadataCacheCommon druidServers = serverView.getDruidServers(); } - public SegmentMetadataCache buildSchema1() throws InterruptedException + public SegmentMetadataCache buildSchemaMarkAndTableLatch() throws InterruptedException + { + return buildSchemaMarkAndTableLatch(SEGMENT_CACHE_CONFIG_DEFAULT); + } + + public SegmentMetadataCache buildSchemaMarkAndTableLatch(SegmentMetadataCacheConfig config) throws InterruptedException { Preconditions.checkState(runningSchema == null); runningSchema = new SegmentMetadataCache( @@ -194,7 +295,7 @@ public class SegmentMetadataCacheTest extends SegmentMetadataCacheCommon ImmutableSet.of(globalTableJoinable), ImmutableMap.of(globalTableJoinable.getClass(), GlobalTableDataSource.class) ), - SEGMENT_CACHE_CONFIG_DEFAULT, + config, new NoopEscalator(), new BrokerInternalQueryConfig(), new NoopServiceEmitter() @@ -221,58 +322,7 @@ public class SegmentMetadataCacheTest extends SegmentMetadataCacheCommon return runningSchema; } - public SegmentMetadataCache buildSchema2() throws InterruptedException - { - Preconditions.checkState(runningSchema == null); - runningSchema = new SegmentMetadataCache( - CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate), - serverView, - segmentManager, - new MapJoinableFactory( - ImmutableSet.of(globalTableJoinable), - ImmutableMap.of(globalTableJoinable.getClass(), GlobalTableDataSource.class) - ), - SEGMENT_CACHE_CONFIG_DEFAULT, - new NoopEscalator(), - new BrokerInternalQueryConfig(), - new NoopServiceEmitter() - ) - { - boolean throwException = true; - - @Override - protected DatasourceTable.PhysicalDatasourceMetadata buildDruidTable(String dataSource) - { - DatasourceTable.PhysicalDatasourceMetadata table = super.buildDruidTable(dataSource); - buildTableLatch.countDown(); - return table; - } - - @Override - protected Set refreshSegments(final Set segments) throws IOException - { - if (throwException) { - throwException = false; - throw new RuntimeException("Query[xxxx] url[http://xxxx:8083/druid/v2/] timed out."); - } else { - return super.refreshSegments(segments); - } - } - - @Override - void markDataSourceAsNeedRebuild(String datasource) - { - super.markDataSourceAsNeedRebuild(datasource); - markDataSourceLatch.countDown(); - } - }; - - runningSchema.start(); - runningSchema.awaitInitialization(); - return runningSchema; - } - - public SegmentMetadataCache buildSchema3() throws InterruptedException + public SegmentMetadataCache buildSchemaMarkAndRefreshLatch() throws InterruptedException { Preconditions.checkState(runningSchema == null); runningSchema = new SegmentMetadataCache( @@ -322,24 +372,24 @@ public class SegmentMetadataCacheTest extends SegmentMetadataCacheCommon @Test public void testGetTableMap() throws InterruptedException { - SegmentMetadataCache schema = buildSchema1(); - Assert.assertEquals(ImmutableSet.of("foo", "foo2"), schema.getDatasourceNames()); + SegmentMetadataCache schema = buildSchemaMarkAndTableLatch(); + Assert.assertEquals(ImmutableSet.of(CalciteTests.DATASOURCE1, CalciteTests.DATASOURCE2, CalciteTests.SOME_DATASOURCE), schema.getDatasourceNames()); final Set tableNames = schema.getDatasourceNames(); - Assert.assertEquals(ImmutableSet.of("foo", "foo2"), tableNames); + Assert.assertEquals(ImmutableSet.of(CalciteTests.DATASOURCE1, CalciteTests.DATASOURCE2, CalciteTests.SOME_DATASOURCE), tableNames); } @Test public void testSchemaInit() throws InterruptedException { - SegmentMetadataCache schema2 = buildSchema1(); - Assert.assertEquals(ImmutableSet.of("foo", "foo2"), schema2.getDatasourceNames()); + SegmentMetadataCache schema2 = buildSchemaMarkAndTableLatch(); + Assert.assertEquals(ImmutableSet.of(CalciteTests.DATASOURCE1, CalciteTests.DATASOURCE2, CalciteTests.SOME_DATASOURCE), schema2.getDatasourceNames()); } @Test public void testGetTableMapFoo() throws InterruptedException { - SegmentMetadataCache schema = buildSchema1(); + SegmentMetadataCache schema = buildSchemaMarkAndTableLatch(); final DatasourceTable.PhysicalDatasourceMetadata fooDs = schema.getDatasource("foo"); final DruidTable fooTable = new DatasourceTable(fooDs); final RelDataType rowType = fooTable.getRowType(new JavaTypeFactoryImpl()); @@ -354,7 +404,7 @@ public class SegmentMetadataCacheTest extends SegmentMetadataCacheCommon Assert.assertEquals(SqlTypeName.VARCHAR, fields.get(1).getType().getSqlTypeName()); Assert.assertEquals("m1", fields.get(2).getName()); - Assert.assertEquals(SqlTypeName.BIGINT, fields.get(2).getType().getSqlTypeName()); + Assert.assertEquals(SqlTypeName.DOUBLE, fields.get(2).getType().getSqlTypeName()); Assert.assertEquals("dim1", fields.get(3).getName()); Assert.assertEquals(SqlTypeName.VARCHAR, fields.get(3).getType().getSqlTypeName()); @@ -369,7 +419,7 @@ public class SegmentMetadataCacheTest extends SegmentMetadataCacheCommon @Test public void testGetTableMapFoo2() throws InterruptedException { - SegmentMetadataCache schema = buildSchema1(); + SegmentMetadataCache schema = buildSchemaMarkAndTableLatch(); final DatasourceTable.PhysicalDatasourceMetadata fooDs = schema.getDatasource("foo2"); final DruidTable fooTable = new DatasourceTable(fooDs); final RelDataType rowType = fooTable.getRowType(new JavaTypeFactoryImpl()); @@ -387,6 +437,103 @@ public class SegmentMetadataCacheTest extends SegmentMetadataCacheCommon Assert.assertEquals(SqlTypeName.BIGINT, fields.get(2).getType().getSqlTypeName()); } + @Test + public void testGetTableMapSomeTable() throws InterruptedException + { + // using 'newest first' column type merge strategy, the types are expected to be the types defined in the newer + // segment, except for json, which is special handled + SegmentMetadataCache schema = buildSchemaMarkAndTableLatch( + new SegmentMetadataCacheConfig() { + @Override + public SegmentMetadataCache.ColumnTypeMergePolicy getMetadataColumnTypeMergePolicy() + { + return new SegmentMetadataCache.FirstTypeMergePolicy(); + } + } + ); + final DatasourceTable.PhysicalDatasourceMetadata fooDs = schema.getDatasource(CalciteTests.SOME_DATASOURCE); + final DruidTable table = new DatasourceTable(fooDs); + final RelDataType rowType = table.getRowType(new JavaTypeFactoryImpl()); + final List fields = rowType.getFieldList(); + + Assert.assertEquals(9, fields.size()); + + Assert.assertEquals("__time", fields.get(0).getName()); + Assert.assertEquals(SqlTypeName.TIMESTAMP, fields.get(0).getType().getSqlTypeName()); + + Assert.assertEquals("numbery", fields.get(1).getName()); + Assert.assertEquals(SqlTypeName.BIGINT, fields.get(1).getType().getSqlTypeName()); + + Assert.assertEquals("numberyArrays", fields.get(2).getName()); + Assert.assertEquals(SqlTypeName.ARRAY, fields.get(2).getType().getSqlTypeName()); + Assert.assertEquals(SqlTypeName.DOUBLE, fields.get(2).getType().getComponentType().getSqlTypeName()); + + Assert.assertEquals("stringy", fields.get(3).getName()); + Assert.assertEquals(SqlTypeName.VARCHAR, fields.get(3).getType().getSqlTypeName()); + + Assert.assertEquals("array", fields.get(4).getName()); + Assert.assertEquals(SqlTypeName.ARRAY, fields.get(4).getType().getSqlTypeName()); + Assert.assertEquals(SqlTypeName.BIGINT, fields.get(4).getType().getComponentType().getSqlTypeName()); + + Assert.assertEquals("nested", fields.get(5).getName()); + Assert.assertEquals(SqlTypeName.OTHER, fields.get(5).getType().getSqlTypeName()); + + Assert.assertEquals("cnt", fields.get(6).getName()); + Assert.assertEquals(SqlTypeName.BIGINT, fields.get(6).getType().getSqlTypeName()); + + Assert.assertEquals("m1", fields.get(7).getName()); + Assert.assertEquals(SqlTypeName.DOUBLE, fields.get(7).getType().getSqlTypeName()); + + Assert.assertEquals("unique_dim1", fields.get(8).getName()); + Assert.assertEquals(SqlTypeName.OTHER, fields.get(8).getType().getSqlTypeName()); + } + + @Test + public void testGetTableMapSomeTableLeastRestrictiveTypeMerge() throws InterruptedException + { + // using 'least restrictive' column type merge strategy, the types are expected to be the types defined as the + // least restrictive blend across all segments + SegmentMetadataCache schema = buildSchemaMarkAndTableLatch(); + final DatasourceTable.PhysicalDatasourceMetadata fooDs = schema.getDatasource(CalciteTests.SOME_DATASOURCE); + final DruidTable table = new DatasourceTable(fooDs); + final RelDataType rowType = table.getRowType(new JavaTypeFactoryImpl()); + final List fields = rowType.getFieldList(); + + Assert.assertEquals(9, fields.size()); + + Assert.assertEquals("__time", fields.get(0).getName()); + Assert.assertEquals(SqlTypeName.TIMESTAMP, fields.get(0).getType().getSqlTypeName()); + + Assert.assertEquals("numbery", fields.get(1).getName()); + Assert.assertEquals(SqlTypeName.DOUBLE, fields.get(1).getType().getSqlTypeName()); + + Assert.assertEquals("numberyArrays", fields.get(2).getName()); + Assert.assertEquals(SqlTypeName.ARRAY, fields.get(2).getType().getSqlTypeName()); + Assert.assertEquals(SqlTypeName.DOUBLE, fields.get(2).getType().getComponentType().getSqlTypeName()); + + Assert.assertEquals("stringy", fields.get(3).getName()); + Assert.assertEquals(SqlTypeName.ARRAY, fields.get(3).getType().getSqlTypeName()); + Assert.assertEquals(SqlTypeName.VARCHAR, fields.get(3).getType().getComponentType().getSqlTypeName()); + + Assert.assertEquals("array", fields.get(4).getName()); + Assert.assertEquals(SqlTypeName.ARRAY, fields.get(4).getType().getSqlTypeName()); + Assert.assertEquals(SqlTypeName.DOUBLE, fields.get(4).getType().getComponentType().getSqlTypeName()); + + Assert.assertEquals("nested", fields.get(5).getName()); + Assert.assertEquals(SqlTypeName.OTHER, fields.get(5).getType().getSqlTypeName()); + + Assert.assertEquals("cnt", fields.get(6).getName()); + Assert.assertEquals(SqlTypeName.BIGINT, fields.get(6).getType().getSqlTypeName()); + + Assert.assertEquals("m1", fields.get(7).getName()); + Assert.assertEquals(SqlTypeName.DOUBLE, fields.get(7).getType().getSqlTypeName()); + + Assert.assertEquals("unique_dim1", fields.get(8).getName()); + Assert.assertEquals(SqlTypeName.OTHER, fields.get(8).getType().getSqlTypeName()); + } + + + /** * This tests that {@link AvailableSegmentMetadata#getNumRows()} is correct in case * of multiple replicas i.e. when {@link SegmentMetadataCache#addSegment(DruidServerMetadata, DataSegment)} @@ -396,13 +543,13 @@ public class SegmentMetadataCacheTest extends SegmentMetadataCacheCommon @Test public void testAvailableSegmentMetadataNumRows() throws InterruptedException { - SegmentMetadataCache schema = buildSchema1(); + SegmentMetadataCache schema = buildSchemaMarkAndTableLatch(); Map segmentsMetadata = schema.getSegmentMetadataSnapshot(); final List segments = segmentsMetadata.values() .stream() .map(AvailableSegmentMetadata::getSegment) .collect(Collectors.toList()); - Assert.assertEquals(4, segments.size()); + Assert.assertEquals(6, segments.size()); // find the only segment with datasource "foo2" final DataSegment existingSegment = segments.stream() .filter(segment -> segment.getDataSource().equals("foo2")) @@ -446,13 +593,13 @@ public class SegmentMetadataCacheTest extends SegmentMetadataCacheCommon @Test public void testNullDatasource() throws IOException, InterruptedException { - SegmentMetadataCache schema = buildSchema1(); + SegmentMetadataCache schema = buildSchemaMarkAndTableLatch(); final Map segmentMetadatas = schema.getSegmentMetadataSnapshot(); final List segments = segmentMetadatas.values() .stream() .map(AvailableSegmentMetadata::getSegment) .collect(Collectors.toList()); - Assert.assertEquals(4, segments.size()); + Assert.assertEquals(6, segments.size()); // segments contains two segments with datasource "foo" and one with datasource "foo2" // let's remove the only segment with datasource "foo2" final DataSegment segmentToRemove = segments.stream() @@ -465,19 +612,19 @@ public class SegmentMetadataCacheTest extends SegmentMetadataCacheCommon // The following line can cause NPE without segmentMetadata null check in // SegmentMetadataCache#refreshSegmentsForDataSource schema.refreshSegments(segments.stream().map(DataSegment::getId).collect(Collectors.toSet())); - Assert.assertEquals(3, schema.getSegmentMetadataSnapshot().size()); + Assert.assertEquals(5, schema.getSegmentMetadataSnapshot().size()); } @Test public void testNullAvailableSegmentMetadata() throws IOException, InterruptedException { - SegmentMetadataCache schema = buildSchema1(); + SegmentMetadataCache schema = buildSchemaMarkAndTableLatch(); final Map segmentMetadatas = schema.getSegmentMetadataSnapshot(); final List segments = segmentMetadatas.values() .stream() .map(AvailableSegmentMetadata::getSegment) .collect(Collectors.toList()); - Assert.assertEquals(4, segments.size()); + Assert.assertEquals(6, segments.size()); // remove one of the segments with datasource "foo" final DataSegment segmentToRemove = segments.stream() .filter(segment -> segment.getDataSource().equals("foo")) @@ -489,13 +636,13 @@ public class SegmentMetadataCacheTest extends SegmentMetadataCacheCommon // The following line can cause NPE without segmentMetadata null check in // SegmentMetadataCache#refreshSegmentsForDataSource schema.refreshSegments(segments.stream().map(DataSegment::getId).collect(Collectors.toSet())); - Assert.assertEquals(3, schema.getSegmentMetadataSnapshot().size()); + Assert.assertEquals(5, schema.getSegmentMetadataSnapshot().size()); } @Test public void testAvailableSegmentMetadataIsRealtime() throws InterruptedException { - SegmentMetadataCache schema = buildSchema1(); + SegmentMetadataCache schema = buildSchemaMarkAndTableLatch(); Map segmentsMetadata = schema.getSegmentMetadataSnapshot(); final List segments = segmentsMetadata.values() .stream() @@ -576,7 +723,7 @@ public class SegmentMetadataCacheTest extends SegmentMetadataCacheCommon serverView.addSegment(newSegment(datasource, 1), ServerType.HISTORICAL); Assert.assertTrue(addSegmentLatch.await(1, TimeUnit.SECONDS)); - Assert.assertEquals(5, schema.getTotalSegments()); + Assert.assertEquals(7, schema.getTotalSegments()); List metadatas = schema .getSegmentMetadataSnapshot() .values() @@ -621,7 +768,7 @@ public class SegmentMetadataCacheTest extends SegmentMetadataCacheCommon serverView.addSegment(segment, ServerType.HISTORICAL); Assert.assertTrue(addSegmentLatch.await(1, TimeUnit.SECONDS)); - Assert.assertEquals(5, schema.getTotalSegments()); + Assert.assertEquals(7, schema.getTotalSegments()); List metadatas = schema .getSegmentMetadataSnapshot() .values() @@ -666,7 +813,7 @@ public class SegmentMetadataCacheTest extends SegmentMetadataCacheCommon serverView.addSegment(newSegment(datasource, 1), ServerType.REALTIME); Assert.assertTrue(addSegmentLatch.await(1, TimeUnit.SECONDS)); - Assert.assertEquals(5, schema.getTotalSegments()); + Assert.assertEquals(7, schema.getTotalSegments()); List metadatas = schema .getSegmentMetadataSnapshot() .values() @@ -710,7 +857,7 @@ public class SegmentMetadataCacheTest extends SegmentMetadataCacheCommon serverView.addSegment(newSegment(datasource, 1), ServerType.BROKER); Assert.assertTrue(addSegmentLatch.await(1, TimeUnit.SECONDS)); - Assert.assertEquals(4, schema.getTotalSegments()); + Assert.assertEquals(6, schema.getTotalSegments()); List metadatas = schema .getSegmentMetadataSnapshot() .values() @@ -765,7 +912,7 @@ public class SegmentMetadataCacheTest extends SegmentMetadataCacheCommon serverView.removeSegment(segment, ServerType.REALTIME); Assert.assertTrue(removeSegmentLatch.await(1, TimeUnit.SECONDS)); - Assert.assertEquals(4, schema.getTotalSegments()); + Assert.assertEquals(6, schema.getTotalSegments()); List metadatas = schema .getSegmentMetadataSnapshot() .values() @@ -827,7 +974,7 @@ public class SegmentMetadataCacheTest extends SegmentMetadataCacheCommon serverView.removeSegment(segments.get(0), ServerType.REALTIME); Assert.assertTrue(removeSegmentLatch.await(1, TimeUnit.SECONDS)); - Assert.assertEquals(5, schema.getTotalSegments()); + Assert.assertEquals(7, schema.getTotalSegments()); List metadatas = schema .getSegmentMetadataSnapshot() .values() @@ -872,7 +1019,7 @@ public class SegmentMetadataCacheTest extends SegmentMetadataCacheCommon serverView.removeSegment(newSegment(datasource, 1), ServerType.HISTORICAL); Assert.assertTrue(removeServerSegmentLatch.await(1, TimeUnit.SECONDS)); - Assert.assertEquals(4, schema.getTotalSegments()); + Assert.assertEquals(6, schema.getTotalSegments()); } @Test @@ -919,7 +1066,7 @@ public class SegmentMetadataCacheTest extends SegmentMetadataCacheCommon serverView.removeSegment(segment, ServerType.BROKER); Assert.assertTrue(removeServerSegmentLatch.await(1, TimeUnit.SECONDS)); - Assert.assertEquals(5, schema.getTotalSegments()); + Assert.assertEquals(7, schema.getTotalSegments()); Assert.assertTrue(schema.getDataSourcesNeedingRebuild().contains(datasource)); } @@ -967,7 +1114,7 @@ public class SegmentMetadataCacheTest extends SegmentMetadataCacheCommon serverView.removeSegment(segment, ServerType.HISTORICAL); Assert.assertTrue(removeServerSegmentLatch.await(1, TimeUnit.SECONDS)); - Assert.assertEquals(5, schema.getTotalSegments()); + Assert.assertEquals(7, schema.getTotalSegments()); List metadatas = schema .getSegmentMetadataSnapshot() .values() @@ -995,7 +1142,7 @@ public class SegmentMetadataCacheTest extends SegmentMetadataCacheCommon @Test public void testLocalSegmentCacheSetsDataSourceAsGlobalAndJoinable() throws InterruptedException { - SegmentMetadataCache schema3 = buildSchema3(); + SegmentMetadataCache schema3 = buildSchemaMarkAndRefreshLatch(); Assert.assertTrue(refreshLatch.await(WAIT_TIMEOUT_SECS, TimeUnit.SECONDS)); DatasourceTable.PhysicalDatasourceMetadata fooTable = schema3.getDatasource("foo"); Assert.assertNotNull(fooTable); @@ -1061,7 +1208,7 @@ public class SegmentMetadataCacheTest extends SegmentMetadataCacheCommon @Test public void testLocalSegmentCacheSetsDataSourceAsBroadcastButNotJoinable() throws InterruptedException { - SegmentMetadataCache schema = buildSchema3(); + SegmentMetadataCache schema = buildSchemaMarkAndRefreshLatch(); Assert.assertTrue(refreshLatch.await(WAIT_TIMEOUT_SECS, TimeUnit.SECONDS)); DatasourceTable.PhysicalDatasourceMetadata fooTable = schema.getDatasource("foo"); Assert.assertNotNull(fooTable); @@ -1291,7 +1438,7 @@ public class SegmentMetadataCacheTest extends SegmentMetadataCacheCommon @Test public void testStaleDatasourceRefresh() throws IOException, InterruptedException { - SegmentMetadataCache schema = buildSchema1(); + SegmentMetadataCache schema = buildSchemaMarkAndTableLatch(); Set segments = new HashSet<>(); Set datasources = new HashSet<>(); datasources.add("wat");