From 9b598407c1a0fced304f2cfe59d16a0973ba4245 Mon Sep 17 00:00:00 2001 From: Jonathan Wei Date: Wed, 22 Dec 2021 10:47:57 -0600 Subject: [PATCH] Add interface for external schema provider to Druid SQL (#12043) * Add interfce for external schema provider to Druid SQL * Add annotations --- ...ruidSchemaInternRowSignatureBenchmark.java | 3 +- .../druid/sql/calcite/schema/DruidSchema.java | 31 +++++++++++++-- .../calcite/schema/DruidSchemaManager.java | 38 +++++++++++++++++++ .../schema/NoopDruidSchemaManager.java | 38 +++++++++++++++++++ .../org/apache/druid/sql/guice/SqlModule.java | 15 ++++++++ .../sql/calcite/BaseCalciteQueryTest.java | 2 + .../schema/DruidCalciteSchemaModuleTest.java | 1 + .../schema/DruidSchemaConcurrencyTest.java | 6 ++- .../schema/DruidSchemaNoDataInitTest.java | 3 +- .../sql/calcite/schema/DruidSchemaTest.java | 36 ++++++++++++------ .../sql/calcite/schema/SystemSchemaTest.java | 3 +- .../druid/sql/calcite/util/CalciteTests.java | 13 +++++-- 12 files changed, 164 insertions(+), 25 deletions(-) create mode 100644 sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchemaManager.java create mode 100644 sql/src/main/java/org/apache/druid/sql/calcite/schema/NoopDruidSchemaManager.java diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/DruidSchemaInternRowSignatureBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/DruidSchemaInternRowSignatureBenchmark.java index d84d6e34925..e05d1549a42 100644 --- a/benchmarks/src/test/java/org/apache/druid/benchmark/DruidSchemaInternRowSignatureBenchmark.java +++ b/benchmarks/src/test/java/org/apache/druid/benchmark/DruidSchemaInternRowSignatureBenchmark.java @@ -90,7 +90,8 @@ public class DruidSchemaInternRowSignatureBenchmark joinableFactory, config, escalator, - brokerInternalQueryConfig + brokerInternalQueryConfig, + null ); } diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java b/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java index d45bc6788c0..2cf8b609ec4 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java @@ -107,6 +107,7 @@ public class DruidSchema extends AbstractSchema private final JoinableFactory joinableFactory; private final ExecutorService cacheExec; private final ExecutorService callbackExec; + private final DruidSchemaManager druidSchemaManager; /** * Map of DataSource -> DruidTable. @@ -212,7 +213,8 @@ public class DruidSchema extends AbstractSchema final JoinableFactory joinableFactory, final PlannerConfig config, final Escalator escalator, - final BrokerInternalQueryConfig brokerInternalQueryConfig + final BrokerInternalQueryConfig brokerInternalQueryConfig, + final DruidSchemaManager druidSchemaManager ) { this.queryLifecycleFactory = Preconditions.checkNotNull(queryLifecycleFactory, "queryLifecycleFactory"); @@ -224,7 +226,15 @@ public class DruidSchema extends AbstractSchema this.callbackExec = Execs.singleThreaded("DruidSchema-Callback-%d"); this.escalator = escalator; this.brokerInternalQueryConfig = brokerInternalQueryConfig; + this.druidSchemaManager = druidSchemaManager; + if (druidSchemaManager == null || druidSchemaManager instanceof NoopDruidSchemaManager) { + initServerViewTimelineCallback(serverView); + } + } + + private void initServerViewTimelineCallback(final TimelineServerView serverView) + { serverView.registerTimelineCallback( callbackExec, new TimelineServerView.TimelineCallback() @@ -267,8 +277,7 @@ public class DruidSchema extends AbstractSchema ); } - @LifecycleStart - public void start() throws InterruptedException + private void startCacheExec() { cacheExec.submit( () -> { @@ -361,6 +370,16 @@ public class DruidSchema extends AbstractSchema } } ); + } + + @LifecycleStart + public void start() throws InterruptedException + { + if (druidSchemaManager == null || druidSchemaManager instanceof NoopDruidSchemaManager) { + startCacheExec(); + } else { + initialized.countDown(); + } if (config.isAwaitInitializationOnStart()) { final long startNanos = System.nanoTime(); @@ -414,7 +433,11 @@ public class DruidSchema extends AbstractSchema @Override protected Map getTableMap() { - return ImmutableMap.copyOf(tables); + if (druidSchemaManager != null && !(druidSchemaManager instanceof NoopDruidSchemaManager)) { + return ImmutableMap.copyOf(druidSchemaManager.getTables()); + } else { + return ImmutableMap.copyOf(tables); + } } @VisibleForTesting diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchemaManager.java b/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchemaManager.java new file mode 100644 index 00000000000..dd3e7380312 --- /dev/null +++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchemaManager.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.sql.calcite.schema; + +import org.apache.druid.guice.annotations.ExtensionPoint; +import org.apache.druid.guice.annotations.UnstableApi; +import org.apache.druid.sql.calcite.table.DruidTable; + +import java.util.concurrent.ConcurrentMap; + +/** + * This interface provides a map of datasource names to {@link DruidTable} objects, used by the {@link DruidSchema} + * class as the SQL planner's view of Druid datasource schemas. If a non-default implementation is provided, + * the segment metadata polling-based view of the Druid tables will not be built in DruidSchema. + */ +@ExtensionPoint +@UnstableApi +public interface DruidSchemaManager +{ + ConcurrentMap getTables(); +} diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/schema/NoopDruidSchemaManager.java b/sql/src/main/java/org/apache/druid/sql/calcite/schema/NoopDruidSchemaManager.java new file mode 100644 index 00000000000..4d62dfec6fe --- /dev/null +++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/NoopDruidSchemaManager.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.sql.calcite.schema; + +import org.apache.druid.sql.calcite.table.DruidTable; + +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +public class NoopDruidSchemaManager implements DruidSchemaManager +{ + public static final String TYPE = "noop"; + + private static ConcurrentMap MAP = new ConcurrentHashMap<>(); + + @Override + public ConcurrentMap getTables() + { + return MAP; + } +} diff --git a/sql/src/main/java/org/apache/druid/sql/guice/SqlModule.java b/sql/src/main/java/org/apache/druid/sql/guice/SqlModule.java index da12e7d0f45..6e8acdd4e8d 100644 --- a/sql/src/main/java/org/apache/druid/sql/guice/SqlModule.java +++ b/sql/src/main/java/org/apache/druid/sql/guice/SqlModule.java @@ -33,6 +33,8 @@ import org.apache.druid.sql.calcite.expression.builtin.QueryLookupOperatorConver import org.apache.druid.sql.calcite.planner.CalcitePlannerModule; import org.apache.druid.sql.calcite.planner.Calcites; import org.apache.druid.sql.calcite.schema.DruidCalciteSchemaModule; +import org.apache.druid.sql.calcite.schema.DruidSchemaManager; +import org.apache.druid.sql.calcite.schema.NoopDruidSchemaManager; import org.apache.druid.sql.calcite.view.DruidViewModule; import org.apache.druid.sql.calcite.view.NoopViewManager; import org.apache.druid.sql.calcite.view.ViewManager; @@ -46,6 +48,7 @@ public class SqlModule implements Module public static final String PROPERTY_SQL_ENABLE_JSON_OVER_HTTP = "druid.sql.http.enable"; public static final String PROPERTY_SQL_ENABLE_AVATICA = "druid.sql.avatica.enable"; public static final String PROPERTY_SQL_VIEW_MANAGER_TYPE = "druid.sql.viewmanager.type"; + public static final String PROPERTY_SQL_SCHEMA_MANAGER_TYPE = "druid.sql.schemamanager.type"; public static final String PROPERTY_SQL_APPROX_COUNT_DISTINCT_CHOICE = "druid.sql.approxCountDistinct.function"; @Inject @@ -82,6 +85,18 @@ public class SqlModule implements Module NoopViewManager.TYPE ); + PolyBind.optionBinder(binder, Key.get(DruidSchemaManager.class)) + .addBinding(NoopDruidSchemaManager.TYPE) + .to(NoopDruidSchemaManager.class) + .in(LazySingleton.class); + + PolyBind.createChoiceWithDefault( + binder, + PROPERTY_SQL_SCHEMA_MANAGER_TYPE, + Key.get(DruidSchemaManager.class), + NoopDruidSchemaManager.TYPE + ); + binder.install(new DruidCalciteSchemaModule()); binder.install(new CalcitePlannerModule()); binder.install(new SqlAggregationModule()); diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/BaseCalciteQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/BaseCalciteQueryTest.java index 5adaa127892..fc21872c013 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/BaseCalciteQueryTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/BaseCalciteQueryTest.java @@ -88,6 +88,7 @@ import org.apache.druid.sql.calcite.planner.PlannerConfig; import org.apache.druid.sql.calcite.planner.PlannerContext; import org.apache.druid.sql.calcite.planner.PlannerFactory; import org.apache.druid.sql.calcite.schema.DruidSchemaCatalog; +import org.apache.druid.sql.calcite.schema.NoopDruidSchemaManager; import org.apache.druid.sql.calcite.util.CalciteTestBase; import org.apache.druid.sql.calcite.util.CalciteTests; import org.apache.druid.sql.calcite.util.QueryLogHook; @@ -995,6 +996,7 @@ public class BaseCalciteQueryTest extends CalciteTestBase walker, plannerConfig, viewManager, + new NoopDruidSchemaManager(), authorizerMapper ); diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidCalciteSchemaModuleTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidCalciteSchemaModuleTest.java index 2553f998d38..70a5862e3d9 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidCalciteSchemaModuleTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidCalciteSchemaModuleTest.java @@ -119,6 +119,7 @@ public class DruidCalciteSchemaModuleTest extends CalciteTestBase .annotatedWith(IndexingService.class) .toInstance(overlordDruidLeaderClient); binder.bind(DruidNodeDiscoveryProvider.class).toInstance(druidNodeDiscoveryProvider); + binder.bind(DruidSchemaManager.class).toInstance(new NoopDruidSchemaManager()); binder.bind(ObjectMapper.class).annotatedWith(Json.class).toInstance(objectMapper); binder.bindScope(LazySingleton.class, Scopes.SINGLETON); binder.bind(LookupExtractorFactoryContainerProvider.class).toInstance(lookupReferencesManager); diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaConcurrencyTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaConcurrencyTest.java index 67a626dc536..6beda7d0bd0 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaConcurrencyTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaConcurrencyTest.java @@ -137,7 +137,8 @@ public class DruidSchemaConcurrencyTest extends DruidSchemaTestCommon new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of()), PLANNER_CONFIG_DEFAULT, new NoopEscalator(), - new BrokerInternalQueryConfig() + new BrokerInternalQueryConfig(), + null ) { @Override @@ -241,7 +242,8 @@ public class DruidSchemaConcurrencyTest extends DruidSchemaTestCommon new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of()), PLANNER_CONFIG_DEFAULT, new NoopEscalator(), - new BrokerInternalQueryConfig() + new BrokerInternalQueryConfig(), + null ) { @Override diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaNoDataInitTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaNoDataInitTest.java index b2c1981c1b4..3f2c36f7b82 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaNoDataInitTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaNoDataInitTest.java @@ -59,7 +59,8 @@ public class DruidSchemaNoDataInitTest extends CalciteTestBase new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of()), PLANNER_CONFIG_DEFAULT, new NoopEscalator(), - new BrokerInternalQueryConfig() + new BrokerInternalQueryConfig(), + null ); druidSchema.start(); diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaTest.java index 38654b212be..3e89d812299 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaTest.java @@ -181,7 +181,8 @@ public class DruidSchemaTest extends DruidSchemaTestCommon ), PLANNER_CONFIG_DEFAULT, new NoopEscalator(), - new BrokerInternalQueryConfig() + new BrokerInternalQueryConfig(), + null ) { @Override @@ -207,7 +208,8 @@ public class DruidSchemaTest extends DruidSchemaTestCommon new MapJoinableFactory(ImmutableSet.of(globalTableJoinable), ImmutableMap.of(globalTableJoinable.getClass(), GlobalTableDataSource.class)), PLANNER_CONFIG_DEFAULT, new NoopEscalator(), - new BrokerInternalQueryConfig() + new BrokerInternalQueryConfig(), + null ) { @@ -485,7 +487,8 @@ public class DruidSchemaTest extends DruidSchemaTestCommon new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of()), PLANNER_CONFIG_DEFAULT, new NoopEscalator(), - new BrokerInternalQueryConfig() + new BrokerInternalQueryConfig(), + null ) { @Override @@ -527,7 +530,8 @@ public class DruidSchemaTest extends DruidSchemaTestCommon new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of()), PLANNER_CONFIG_DEFAULT, new NoopEscalator(), - new BrokerInternalQueryConfig() + new BrokerInternalQueryConfig(), + null ) { @Override @@ -573,7 +577,8 @@ public class DruidSchemaTest extends DruidSchemaTestCommon new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of()), PLANNER_CONFIG_DEFAULT, new NoopEscalator(), - new BrokerInternalQueryConfig() + new BrokerInternalQueryConfig(), + null ) { @Override @@ -616,7 +621,8 @@ public class DruidSchemaTest extends DruidSchemaTestCommon new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of()), PLANNER_CONFIG_DEFAULT, new NoopEscalator(), - new BrokerInternalQueryConfig() + new BrokerInternalQueryConfig(), + null ) { @Override @@ -656,7 +662,8 @@ public class DruidSchemaTest extends DruidSchemaTestCommon new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of()), PLANNER_CONFIG_DEFAULT, new NoopEscalator(), - new BrokerInternalQueryConfig() + new BrokerInternalQueryConfig(), + null ) { @Override @@ -713,7 +720,8 @@ public class DruidSchemaTest extends DruidSchemaTestCommon new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of()), PLANNER_CONFIG_DEFAULT, new NoopEscalator(), - new BrokerInternalQueryConfig() + new BrokerInternalQueryConfig(), + null ) { @Override @@ -773,7 +781,8 @@ public class DruidSchemaTest extends DruidSchemaTestCommon new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of()), PLANNER_CONFIG_DEFAULT, new NoopEscalator(), - new BrokerInternalQueryConfig() + new BrokerInternalQueryConfig(), + null ) { @Override @@ -807,7 +816,8 @@ public class DruidSchemaTest extends DruidSchemaTestCommon new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of()), PLANNER_CONFIG_DEFAULT, new NoopEscalator(), - new BrokerInternalQueryConfig() + new BrokerInternalQueryConfig(), + null ) { @Override @@ -854,7 +864,8 @@ public class DruidSchemaTest extends DruidSchemaTestCommon new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of()), PLANNER_CONFIG_DEFAULT, new NoopEscalator(), - new BrokerInternalQueryConfig() + new BrokerInternalQueryConfig(), + null ) { @Override @@ -1076,7 +1087,8 @@ public class DruidSchemaTest extends DruidSchemaTestCommon ), PLANNER_CONFIG_DEFAULT, new NoopEscalator(), - brokerInternalQueryConfig + brokerInternalQueryConfig, + null ); EasyMock.expect(factoryMock.factorize()).andReturn(lifecycleMock).once(); diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java index 9ce4852f7bf..678b1a0da28 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java @@ -253,7 +253,8 @@ public class SystemSchemaTest extends CalciteTestBase new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of()), PLANNER_CONFIG_DEFAULT, new NoopEscalator(), - new BrokerInternalQueryConfig() + new BrokerInternalQueryConfig(), + null ); druidSchema.start(); druidSchema.awaitInitialization(); diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java b/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java index a91d345c7cb..67901a8e450 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java @@ -123,6 +123,7 @@ import org.apache.druid.sql.calcite.run.NativeQueryMakerFactory; import org.apache.druid.sql.calcite.run.QueryMakerFactory; import org.apache.druid.sql.calcite.schema.DruidSchema; import org.apache.druid.sql.calcite.schema.DruidSchemaCatalog; +import org.apache.druid.sql.calcite.schema.DruidSchemaManager; import org.apache.druid.sql.calcite.schema.InformationSchema; import org.apache.druid.sql.calcite.schema.LookupSchema; import org.apache.druid.sql.calcite.schema.MetadataSegmentView; @@ -131,6 +132,7 @@ import org.apache.druid.sql.calcite.schema.NamedLookupSchema; import org.apache.druid.sql.calcite.schema.NamedSchema; import org.apache.druid.sql.calcite.schema.NamedSystemSchema; import org.apache.druid.sql.calcite.schema.NamedViewSchema; +import org.apache.druid.sql.calcite.schema.NoopDruidSchemaManager; import org.apache.druid.sql.calcite.schema.SystemSchema; import org.apache.druid.sql.calcite.schema.ViewSchema; import org.apache.druid.sql.calcite.view.DruidViewMacroFactory; @@ -1164,7 +1166,7 @@ public class CalciteTests final AuthorizerMapper authorizerMapper ) { - return createMockRootSchema(conglomerate, walker, plannerConfig, null, authorizerMapper); + return createMockRootSchema(conglomerate, walker, plannerConfig, null, new NoopDruidSchemaManager(), authorizerMapper); } public static DruidSchemaCatalog createMockRootSchema( @@ -1172,10 +1174,11 @@ public class CalciteTests final SpecificSegmentsQuerySegmentWalker walker, final PlannerConfig plannerConfig, @Nullable final ViewManager viewManager, + final DruidSchemaManager druidSchemaManager, final AuthorizerMapper authorizerMapper ) { - DruidSchema druidSchema = createMockSchema(conglomerate, walker, plannerConfig); + DruidSchema druidSchema = createMockSchema(conglomerate, walker, plannerConfig, druidSchemaManager); SystemSchema systemSchema = CalciteTests.createMockSystemSchema(druidSchema, walker, plannerConfig, authorizerMapper); @@ -1216,7 +1219,8 @@ public class CalciteTests private static DruidSchema createMockSchema( final QueryRunnerFactoryConglomerate conglomerate, final SpecificSegmentsQuerySegmentWalker walker, - final PlannerConfig plannerConfig + final PlannerConfig plannerConfig, + final DruidSchemaManager druidSchemaManager ) { final DruidSchema schema = new DruidSchema( @@ -1233,7 +1237,8 @@ public class CalciteTests createDefaultJoinableFactory(), plannerConfig, TEST_AUTHENTICATOR_ESCALATOR, - new BrokerInternalQueryConfig() + new BrokerInternalQueryConfig(), + druidSchemaManager ); try {