Add interface for external schema provider to Druid SQL (#12043)

* Add interfce for external schema provider to Druid SQL

* Add annotations
This commit is contained in:
Jonathan Wei 2021-12-22 10:47:57 -06:00 committed by GitHub
parent 1871a1ab18
commit 9b598407c1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 164 additions and 25 deletions

View File

@ -90,7 +90,8 @@ public class DruidSchemaInternRowSignatureBenchmark
joinableFactory,
config,
escalator,
brokerInternalQueryConfig
brokerInternalQueryConfig,
null
);
}

View File

@ -107,6 +107,7 @@ public class DruidSchema extends AbstractSchema
private final JoinableFactory joinableFactory;
private final ExecutorService cacheExec;
private final ExecutorService callbackExec;
private final DruidSchemaManager druidSchemaManager;
/**
* Map of DataSource -> DruidTable.
@ -212,7 +213,8 @@ public class DruidSchema extends AbstractSchema
final JoinableFactory joinableFactory,
final PlannerConfig config,
final Escalator escalator,
final BrokerInternalQueryConfig brokerInternalQueryConfig
final BrokerInternalQueryConfig brokerInternalQueryConfig,
final DruidSchemaManager druidSchemaManager
)
{
this.queryLifecycleFactory = Preconditions.checkNotNull(queryLifecycleFactory, "queryLifecycleFactory");
@ -224,7 +226,15 @@ public class DruidSchema extends AbstractSchema
this.callbackExec = Execs.singleThreaded("DruidSchema-Callback-%d");
this.escalator = escalator;
this.brokerInternalQueryConfig = brokerInternalQueryConfig;
this.druidSchemaManager = druidSchemaManager;
if (druidSchemaManager == null || druidSchemaManager instanceof NoopDruidSchemaManager) {
initServerViewTimelineCallback(serverView);
}
}
private void initServerViewTimelineCallback(final TimelineServerView serverView)
{
serverView.registerTimelineCallback(
callbackExec,
new TimelineServerView.TimelineCallback()
@ -267,8 +277,7 @@ public class DruidSchema extends AbstractSchema
);
}
@LifecycleStart
public void start() throws InterruptedException
private void startCacheExec()
{
cacheExec.submit(
() -> {
@ -361,6 +370,16 @@ public class DruidSchema extends AbstractSchema
}
}
);
}
@LifecycleStart
public void start() throws InterruptedException
{
if (druidSchemaManager == null || druidSchemaManager instanceof NoopDruidSchemaManager) {
startCacheExec();
} else {
initialized.countDown();
}
if (config.isAwaitInitializationOnStart()) {
final long startNanos = System.nanoTime();
@ -414,7 +433,11 @@ public class DruidSchema extends AbstractSchema
@Override
protected Map<String, Table> getTableMap()
{
return ImmutableMap.copyOf(tables);
if (druidSchemaManager != null && !(druidSchemaManager instanceof NoopDruidSchemaManager)) {
return ImmutableMap.copyOf(druidSchemaManager.getTables());
} else {
return ImmutableMap.copyOf(tables);
}
}
@VisibleForTesting

View File

@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.sql.calcite.schema;
import org.apache.druid.guice.annotations.ExtensionPoint;
import org.apache.druid.guice.annotations.UnstableApi;
import org.apache.druid.sql.calcite.table.DruidTable;
import java.util.concurrent.ConcurrentMap;
/**
* This interface provides a map of datasource names to {@link DruidTable} objects, used by the {@link DruidSchema}
* class as the SQL planner's view of Druid datasource schemas. If a non-default implementation is provided,
* the segment metadata polling-based view of the Druid tables will not be built in DruidSchema.
*/
@ExtensionPoint
@UnstableApi
public interface DruidSchemaManager
{
ConcurrentMap<String, DruidTable> getTables();
}

View File

@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.sql.calcite.schema;
import org.apache.druid.sql.calcite.table.DruidTable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
public class NoopDruidSchemaManager implements DruidSchemaManager
{
public static final String TYPE = "noop";
private static ConcurrentMap<String, DruidTable> MAP = new ConcurrentHashMap<>();
@Override
public ConcurrentMap<String, DruidTable> getTables()
{
return MAP;
}
}

View File

@ -33,6 +33,8 @@ import org.apache.druid.sql.calcite.expression.builtin.QueryLookupOperatorConver
import org.apache.druid.sql.calcite.planner.CalcitePlannerModule;
import org.apache.druid.sql.calcite.planner.Calcites;
import org.apache.druid.sql.calcite.schema.DruidCalciteSchemaModule;
import org.apache.druid.sql.calcite.schema.DruidSchemaManager;
import org.apache.druid.sql.calcite.schema.NoopDruidSchemaManager;
import org.apache.druid.sql.calcite.view.DruidViewModule;
import org.apache.druid.sql.calcite.view.NoopViewManager;
import org.apache.druid.sql.calcite.view.ViewManager;
@ -46,6 +48,7 @@ public class SqlModule implements Module
public static final String PROPERTY_SQL_ENABLE_JSON_OVER_HTTP = "druid.sql.http.enable";
public static final String PROPERTY_SQL_ENABLE_AVATICA = "druid.sql.avatica.enable";
public static final String PROPERTY_SQL_VIEW_MANAGER_TYPE = "druid.sql.viewmanager.type";
public static final String PROPERTY_SQL_SCHEMA_MANAGER_TYPE = "druid.sql.schemamanager.type";
public static final String PROPERTY_SQL_APPROX_COUNT_DISTINCT_CHOICE = "druid.sql.approxCountDistinct.function";
@Inject
@ -82,6 +85,18 @@ public class SqlModule implements Module
NoopViewManager.TYPE
);
PolyBind.optionBinder(binder, Key.get(DruidSchemaManager.class))
.addBinding(NoopDruidSchemaManager.TYPE)
.to(NoopDruidSchemaManager.class)
.in(LazySingleton.class);
PolyBind.createChoiceWithDefault(
binder,
PROPERTY_SQL_SCHEMA_MANAGER_TYPE,
Key.get(DruidSchemaManager.class),
NoopDruidSchemaManager.TYPE
);
binder.install(new DruidCalciteSchemaModule());
binder.install(new CalcitePlannerModule());
binder.install(new SqlAggregationModule());

View File

@ -88,6 +88,7 @@ import org.apache.druid.sql.calcite.planner.PlannerConfig;
import org.apache.druid.sql.calcite.planner.PlannerContext;
import org.apache.druid.sql.calcite.planner.PlannerFactory;
import org.apache.druid.sql.calcite.schema.DruidSchemaCatalog;
import org.apache.druid.sql.calcite.schema.NoopDruidSchemaManager;
import org.apache.druid.sql.calcite.util.CalciteTestBase;
import org.apache.druid.sql.calcite.util.CalciteTests;
import org.apache.druid.sql.calcite.util.QueryLogHook;
@ -995,6 +996,7 @@ public class BaseCalciteQueryTest extends CalciteTestBase
walker,
plannerConfig,
viewManager,
new NoopDruidSchemaManager(),
authorizerMapper
);

View File

@ -119,6 +119,7 @@ public class DruidCalciteSchemaModuleTest extends CalciteTestBase
.annotatedWith(IndexingService.class)
.toInstance(overlordDruidLeaderClient);
binder.bind(DruidNodeDiscoveryProvider.class).toInstance(druidNodeDiscoveryProvider);
binder.bind(DruidSchemaManager.class).toInstance(new NoopDruidSchemaManager());
binder.bind(ObjectMapper.class).annotatedWith(Json.class).toInstance(objectMapper);
binder.bindScope(LazySingleton.class, Scopes.SINGLETON);
binder.bind(LookupExtractorFactoryContainerProvider.class).toInstance(lookupReferencesManager);

View File

@ -137,7 +137,8 @@ public class DruidSchemaConcurrencyTest extends DruidSchemaTestCommon
new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of()),
PLANNER_CONFIG_DEFAULT,
new NoopEscalator(),
new BrokerInternalQueryConfig()
new BrokerInternalQueryConfig(),
null
)
{
@Override
@ -241,7 +242,8 @@ public class DruidSchemaConcurrencyTest extends DruidSchemaTestCommon
new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of()),
PLANNER_CONFIG_DEFAULT,
new NoopEscalator(),
new BrokerInternalQueryConfig()
new BrokerInternalQueryConfig(),
null
)
{
@Override

View File

@ -59,7 +59,8 @@ public class DruidSchemaNoDataInitTest extends CalciteTestBase
new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of()),
PLANNER_CONFIG_DEFAULT,
new NoopEscalator(),
new BrokerInternalQueryConfig()
new BrokerInternalQueryConfig(),
null
);
druidSchema.start();

View File

@ -181,7 +181,8 @@ public class DruidSchemaTest extends DruidSchemaTestCommon
),
PLANNER_CONFIG_DEFAULT,
new NoopEscalator(),
new BrokerInternalQueryConfig()
new BrokerInternalQueryConfig(),
null
)
{
@Override
@ -207,7 +208,8 @@ public class DruidSchemaTest extends DruidSchemaTestCommon
new MapJoinableFactory(ImmutableSet.of(globalTableJoinable), ImmutableMap.of(globalTableJoinable.getClass(), GlobalTableDataSource.class)),
PLANNER_CONFIG_DEFAULT,
new NoopEscalator(),
new BrokerInternalQueryConfig()
new BrokerInternalQueryConfig(),
null
)
{
@ -485,7 +487,8 @@ public class DruidSchemaTest extends DruidSchemaTestCommon
new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of()),
PLANNER_CONFIG_DEFAULT,
new NoopEscalator(),
new BrokerInternalQueryConfig()
new BrokerInternalQueryConfig(),
null
)
{
@Override
@ -527,7 +530,8 @@ public class DruidSchemaTest extends DruidSchemaTestCommon
new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of()),
PLANNER_CONFIG_DEFAULT,
new NoopEscalator(),
new BrokerInternalQueryConfig()
new BrokerInternalQueryConfig(),
null
)
{
@Override
@ -573,7 +577,8 @@ public class DruidSchemaTest extends DruidSchemaTestCommon
new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of()),
PLANNER_CONFIG_DEFAULT,
new NoopEscalator(),
new BrokerInternalQueryConfig()
new BrokerInternalQueryConfig(),
null
)
{
@Override
@ -616,7 +621,8 @@ public class DruidSchemaTest extends DruidSchemaTestCommon
new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of()),
PLANNER_CONFIG_DEFAULT,
new NoopEscalator(),
new BrokerInternalQueryConfig()
new BrokerInternalQueryConfig(),
null
)
{
@Override
@ -656,7 +662,8 @@ public class DruidSchemaTest extends DruidSchemaTestCommon
new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of()),
PLANNER_CONFIG_DEFAULT,
new NoopEscalator(),
new BrokerInternalQueryConfig()
new BrokerInternalQueryConfig(),
null
)
{
@Override
@ -713,7 +720,8 @@ public class DruidSchemaTest extends DruidSchemaTestCommon
new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of()),
PLANNER_CONFIG_DEFAULT,
new NoopEscalator(),
new BrokerInternalQueryConfig()
new BrokerInternalQueryConfig(),
null
)
{
@Override
@ -773,7 +781,8 @@ public class DruidSchemaTest extends DruidSchemaTestCommon
new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of()),
PLANNER_CONFIG_DEFAULT,
new NoopEscalator(),
new BrokerInternalQueryConfig()
new BrokerInternalQueryConfig(),
null
)
{
@Override
@ -807,7 +816,8 @@ public class DruidSchemaTest extends DruidSchemaTestCommon
new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of()),
PLANNER_CONFIG_DEFAULT,
new NoopEscalator(),
new BrokerInternalQueryConfig()
new BrokerInternalQueryConfig(),
null
)
{
@Override
@ -854,7 +864,8 @@ public class DruidSchemaTest extends DruidSchemaTestCommon
new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of()),
PLANNER_CONFIG_DEFAULT,
new NoopEscalator(),
new BrokerInternalQueryConfig()
new BrokerInternalQueryConfig(),
null
)
{
@Override
@ -1076,7 +1087,8 @@ public class DruidSchemaTest extends DruidSchemaTestCommon
),
PLANNER_CONFIG_DEFAULT,
new NoopEscalator(),
brokerInternalQueryConfig
brokerInternalQueryConfig,
null
);
EasyMock.expect(factoryMock.factorize()).andReturn(lifecycleMock).once();

View File

@ -253,7 +253,8 @@ public class SystemSchemaTest extends CalciteTestBase
new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of()),
PLANNER_CONFIG_DEFAULT,
new NoopEscalator(),
new BrokerInternalQueryConfig()
new BrokerInternalQueryConfig(),
null
);
druidSchema.start();
druidSchema.awaitInitialization();

View File

@ -123,6 +123,7 @@ import org.apache.druid.sql.calcite.run.NativeQueryMakerFactory;
import org.apache.druid.sql.calcite.run.QueryMakerFactory;
import org.apache.druid.sql.calcite.schema.DruidSchema;
import org.apache.druid.sql.calcite.schema.DruidSchemaCatalog;
import org.apache.druid.sql.calcite.schema.DruidSchemaManager;
import org.apache.druid.sql.calcite.schema.InformationSchema;
import org.apache.druid.sql.calcite.schema.LookupSchema;
import org.apache.druid.sql.calcite.schema.MetadataSegmentView;
@ -131,6 +132,7 @@ import org.apache.druid.sql.calcite.schema.NamedLookupSchema;
import org.apache.druid.sql.calcite.schema.NamedSchema;
import org.apache.druid.sql.calcite.schema.NamedSystemSchema;
import org.apache.druid.sql.calcite.schema.NamedViewSchema;
import org.apache.druid.sql.calcite.schema.NoopDruidSchemaManager;
import org.apache.druid.sql.calcite.schema.SystemSchema;
import org.apache.druid.sql.calcite.schema.ViewSchema;
import org.apache.druid.sql.calcite.view.DruidViewMacroFactory;
@ -1164,7 +1166,7 @@ public class CalciteTests
final AuthorizerMapper authorizerMapper
)
{
return createMockRootSchema(conglomerate, walker, plannerConfig, null, authorizerMapper);
return createMockRootSchema(conglomerate, walker, plannerConfig, null, new NoopDruidSchemaManager(), authorizerMapper);
}
public static DruidSchemaCatalog createMockRootSchema(
@ -1172,10 +1174,11 @@ public class CalciteTests
final SpecificSegmentsQuerySegmentWalker walker,
final PlannerConfig plannerConfig,
@Nullable final ViewManager viewManager,
final DruidSchemaManager druidSchemaManager,
final AuthorizerMapper authorizerMapper
)
{
DruidSchema druidSchema = createMockSchema(conglomerate, walker, plannerConfig);
DruidSchema druidSchema = createMockSchema(conglomerate, walker, plannerConfig, druidSchemaManager);
SystemSchema systemSchema =
CalciteTests.createMockSystemSchema(druidSchema, walker, plannerConfig, authorizerMapper);
@ -1216,7 +1219,8 @@ public class CalciteTests
private static DruidSchema createMockSchema(
final QueryRunnerFactoryConglomerate conglomerate,
final SpecificSegmentsQuerySegmentWalker walker,
final PlannerConfig plannerConfig
final PlannerConfig plannerConfig,
final DruidSchemaManager druidSchemaManager
)
{
final DruidSchema schema = new DruidSchema(
@ -1233,7 +1237,8 @@ public class CalciteTests
createDefaultJoinableFactory(),
plannerConfig,
TEST_AUTHENTICATOR_ESCALATOR,
new BrokerInternalQueryConfig()
new BrokerInternalQueryConfig(),
druidSchemaManager
);
try {