Broker: Await initialization before finishing startup. (#6742)

* Broker: Await initialization before finishing startup.

In particular, hold off on announcing the service and starting the
HTTP server until the server view and SQL metadata cache are finished
initializing. This closes a window of time where a Broker could return
partial results shortly after startup.

As part of this, some simplification of server-lifecycle service
announcements. This helps ensure that the two different kinds of
announcements we do (legacy and new-style) stay in sync.

* Remove unused imports.

* Fix NPE in ServerRunnable.
This commit is contained in:
Gian Merlino 2018-12-18 20:32:31 -08:00 committed by GitHub
parent f12a1aa993
commit 7a09cde4de
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 207 additions and 162 deletions

View File

@ -1258,6 +1258,7 @@ The Druid SQL server is configured through the following properties on the broke
|`druid.sql.avatica.maxStatementsPerConnection`|Maximum number of simultaneous open statements per Avatica client connection.|1|
|`druid.sql.avatica.connectionIdleTimeout`|Avatica client connection idle timeout.|PT5M|
|`druid.sql.http.enable`|Whether to enable JSON over HTTP querying at `/druid/v2/sql/`.|true|
|`druid.sql.planner.awaitInitializationOnStart`|Boolean|Whether the the Broker will wait for its SQL metadata view to fully initialize before starting up. If set to 'true', the Broker's HTTP server will not start up, and the Broker will not announce itself as available, until the server view is initialized. See also `druid.broker.segment.awaitInitializationOnStart`, a related setting.|true|
|`druid.sql.planner.maxQueryCount`|Maximum number of queries to issue, including nested queries. Set to 1 to disable sub-queries, or set to 0 for unlimited.|8|
|`druid.sql.planner.maxSemiJoinRowsInMemory`|Maximum number of rows to keep in memory for executing two-stage semi-join queries like `SELECT * FROM Employee WHERE DeptName IN (SELECT DeptName FROM Dept)`.|100000|
|`druid.sql.planner.maxTopNLimit`|Maximum threshold for a [TopN query](../querying/topnquery.html). Higher limits will be planned as [GroupBy queries](../querying/groupbyquery.html) instead.|100000|
@ -1291,6 +1292,7 @@ See [cache configuration](#cache-configuration) for how to configure cache setti
|`druid.announcer.type`|batch or http|Segment discovery method to use. "http" enables discovering segments using HTTP instead of zookeeper.|batch|
|`druid.broker.segment.watchedTiers`|List of strings|Broker watches the segment announcements from nodes serving segments to build cache of which node is serving which segments, this configuration allows to only consider segments being served from a whitelist of tiers. By default, Broker would consider all tiers. This can be used to partition your dataSources in specific historical tiers and configure brokers in partitions so that they are only queryable for specific dataSources.|none|
|`druid.broker.segment.watchedDataSources`|List of strings|Broker watches the segment announcements from nodes serving segments to build cache of which node is serving which segments, this configuration allows to only consider segments being served from a whitelist of dataSources. By default, Broker would consider all datasources. This can be used to configure brokers in partitions so that they are only queryable for specific dataSources.|none|
|`druid.broker.segment.awaitInitializationOnStart`|Boolean|Whether the the Broker will wait for its view of segments to fully initialize before starting up. If set to 'true', the Broker's HTTP server will not start up, and the Broker will not announce itself as available, until the server view is initialized. See also `druid.sql.planner.awaitInitializationOnStart`, a related setting.|true|
## Historical

View File

@ -33,6 +33,9 @@ public class BrokerSegmentWatcherConfig
@JsonProperty
private Set<String> watchedDataSources = null;
@JsonProperty
private boolean awaitInitializationOnStart = true;
public Set<String> getWatchedTiers()
{
return watchedTiers;
@ -42,4 +45,9 @@ public class BrokerSegmentWatcherConfig
{
return watchedDataSources;
}
public boolean isAwaitInitializationOnStart()
{
return awaitInitializationOnStart;
}
}

View File

@ -27,10 +27,12 @@ import com.google.inject.Inject;
import org.apache.druid.client.selector.QueryableDruidServer;
import org.apache.druid.client.selector.ServerSelector;
import org.apache.druid.client.selector.TierSelectorStrategy;
import org.apache.druid.guice.ManageLifecycle;
import org.apache.druid.guice.annotations.EscalatedClient;
import org.apache.druid.guice.annotations.Smile;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.http.client.HttpClient;
@ -49,6 +51,7 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.function.Function;
@ -56,6 +59,7 @@ import java.util.stream.Collectors;
/**
*/
@ManageLifecycle
public class BrokerServerView implements TimelineServerView
{
private static final Logger log = new Logger(BrokerServerView.class);
@ -74,19 +78,20 @@ public class BrokerServerView implements TimelineServerView
private final FilteredServerInventoryView baseView;
private final TierSelectorStrategy tierSelectorStrategy;
private final ServiceEmitter emitter;
private final BrokerSegmentWatcherConfig segmentWatcherConfig;
private final Predicate<Pair<DruidServerMetadata, DataSegment>> segmentFilter;
private volatile boolean initialized = false;
private final CountDownLatch initialized = new CountDownLatch(1);
@Inject
public BrokerServerView(
QueryToolChestWarehouse warehouse,
QueryWatcher queryWatcher,
@Smile ObjectMapper smileMapper,
@EscalatedClient HttpClient httpClient,
FilteredServerInventoryView baseView,
TierSelectorStrategy tierSelectorStrategy,
ServiceEmitter emitter,
final QueryToolChestWarehouse warehouse,
final QueryWatcher queryWatcher,
final @Smile ObjectMapper smileMapper,
final @EscalatedClient HttpClient httpClient,
final FilteredServerInventoryView baseView,
final TierSelectorStrategy tierSelectorStrategy,
final ServiceEmitter emitter,
final BrokerSegmentWatcherConfig segmentWatcherConfig
)
{
@ -97,6 +102,7 @@ public class BrokerServerView implements TimelineServerView
this.baseView = baseView;
this.tierSelectorStrategy = tierSelectorStrategy;
this.emitter = emitter;
this.segmentWatcherConfig = segmentWatcherConfig;
this.clients = new ConcurrentHashMap<>();
this.selectors = new HashMap<>();
this.timelines = new HashMap<>();
@ -143,7 +149,7 @@ public class BrokerServerView implements TimelineServerView
@Override
public CallbackAction segmentViewInitialized()
{
initialized = true;
initialized.countDown();
runTimelineCallbacks(TimelineCallback::timelineInitialized);
return ServerView.CallbackAction.CONTINUE;
}
@ -165,9 +171,25 @@ public class BrokerServerView implements TimelineServerView
);
}
@LifecycleStart
public void start() throws InterruptedException
{
if (segmentWatcherConfig.isAwaitInitializationOnStart()) {
final long startMillis = System.currentTimeMillis();
log.info("%s waiting for initialization.", getClass().getSimpleName());
awaitInitialization();
log.info("%s initialized in [%,d] ms.", getClass().getSimpleName(), System.currentTimeMillis() - startMillis);
}
}
public boolean isInitialized()
{
return initialized;
return initialized.getCount() == 0;
}
public void awaitInitialization() throws InterruptedException
{
initialized.await();
}
private QueryableDruidServer addServer(DruidServer server)
@ -183,7 +205,15 @@ public class BrokerServerView implements TimelineServerView
private DirectDruidClient makeDirectClient(DruidServer server)
{
return new DirectDruidClient(warehouse, queryWatcher, smileMapper, httpClient, server.getScheme(), server.getHost(), emitter);
return new DirectDruidClient(
warehouse,
queryWatcher,
smileMapper,
httpClient,
server.getScheme(),
server.getHost(),
emitter
);
}
private QueryableDruidServer removeServer(DruidServer server)

View File

@ -1,90 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.server.coordination.broker;
import com.google.common.base.Predicates;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.inject.Inject;
import org.apache.druid.client.FilteredServerInventoryView;
import org.apache.druid.client.ServerView;
import org.apache.druid.curator.discovery.ServiceAnnouncer;
import org.apache.druid.guice.ManageLifecycle;
import org.apache.druid.guice.annotations.Self;
import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
import org.apache.druid.server.DruidNode;
@ManageLifecycle
public class DruidBroker
{
private final DruidNode self;
private final ServiceAnnouncer serviceAnnouncer;
private volatile boolean started = false;
@Inject
public DruidBroker(
final FilteredServerInventoryView serverInventoryView,
final @Self DruidNode self,
final ServiceAnnouncer serviceAnnouncer
)
{
this.self = self;
this.serviceAnnouncer = serviceAnnouncer;
serverInventoryView.registerSegmentCallback(
MoreExecutors.sameThreadExecutor(),
new ServerView.BaseSegmentCallback()
{
@Override
public ServerView.CallbackAction segmentViewInitialized()
{
serviceAnnouncer.announce(self);
return ServerView.CallbackAction.UNREGISTER;
}
},
// We are not interested in any segment callbacks except view initialization
Predicates.alwaysFalse()
);
}
@LifecycleStart
public void start()
{
synchronized (self) {
if (started) {
return;
}
started = true;
}
}
@LifecycleStop
public void stop()
{
synchronized (self) {
if (!started) {
return;
}
serviceAnnouncer.unannounce(self);
started = false;
}
}
}

View File

@ -20,7 +20,6 @@
package org.apache.druid.cli;
import com.google.common.collect.ImmutableList;
import com.google.inject.Key;
import com.google.inject.Module;
import com.google.inject.name.Names;
import io.airlift.airline.Command;
@ -51,7 +50,6 @@ import org.apache.druid.query.lookup.LookupModule;
import org.apache.druid.server.BrokerQueryResource;
import org.apache.druid.server.ClientInfoResource;
import org.apache.druid.server.ClientQuerySegmentWalker;
import org.apache.druid.server.coordination.broker.DruidBroker;
import org.apache.druid.server.http.BrokerResource;
import org.apache.druid.server.initialization.jetty.JettyServerInitializer;
import org.apache.druid.server.metrics.MetricsModule;
@ -94,7 +92,7 @@ public class CliBroker extends ServerRunnable
binder.bindConstant().annotatedWith(PruneLoadSpec.class).to(true);
binder.bind(CachingClusteredClient.class).in(LazySingleton.class);
binder.bind(BrokerServerView.class).in(LazySingleton.class);
LifecycleModule.register(binder, BrokerServerView.class);
binder.bind(TimelineServerView.class).to(BrokerServerView.class).in(LazySingleton.class);
JsonConfigProvider.bind(binder, "druid.broker.cache", CacheConfig.class);
@ -117,7 +115,6 @@ public class CliBroker extends ServerRunnable
Jerseys.addResource(binder, ClientInfoResource.class);
LifecycleModule.register(binder, BrokerQueryResource.class);
LifecycleModule.register(binder, DruidBroker.class);
Jerseys.addResource(binder, HttpServerInventoryViewResource.class);
@ -125,11 +122,14 @@ public class CliBroker extends ServerRunnable
LifecycleModule.register(binder, Server.class);
binder
.bind(DiscoverySideEffectsProvider.Child.class)
.toProvider(new DiscoverySideEffectsProvider(NodeType.BROKER, ImmutableList.of(LookupNodeService.class)))
.in(LazySingleton.class);
LifecycleModule.registerKey(binder, Key.get(DiscoverySideEffectsProvider.Child.class));
bindAnnouncer(
binder,
DiscoverySideEffectsProvider.builder(NodeType.BROKER)
.serviceClasses(ImmutableList.of(LookupNodeService.class))
.useLegacyAnnouncer(true)
.build()
);
},
new LookupModule(),
new SqlModule()

View File

@ -21,10 +21,8 @@ package org.apache.druid.cli;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Predicates;
import com.google.common.collect.ImmutableList;
import com.google.inject.Binder;
import com.google.inject.Inject;
import com.google.inject.Key;
import com.google.inject.Module;
import com.google.inject.Provides;
import com.google.inject.name.Names;
@ -217,12 +215,11 @@ public class CliCoordinator extends ServerRunnable
DruidCoordinatorCleanupPendingSegments.class
);
binder
.bind(DiscoverySideEffectsProvider.Child.class)
.annotatedWith(Coordinator.class)
.toProvider(new DiscoverySideEffectsProvider(NodeType.COORDINATOR, ImmutableList.of()))
.in(LazySingleton.class);
LifecycleModule.registerKey(binder, Key.get(DiscoverySideEffectsProvider.Child.class, Coordinator.class));
bindAnnouncer(
binder,
Coordinator.class,
DiscoverySideEffectsProvider.builder(NodeType.COORDINATOR).build()
);
}
@Provides

View File

@ -20,13 +20,11 @@
package org.apache.druid.cli;
import com.google.common.collect.ImmutableList;
import com.google.inject.Key;
import com.google.inject.Module;
import com.google.inject.name.Names;
import io.airlift.airline.Command;
import org.apache.druid.client.cache.CacheConfig;
import org.apache.druid.client.cache.CacheMonitor;
import org.apache.druid.discovery.DataNodeService;
import org.apache.druid.discovery.LookupNodeService;
import org.apache.druid.discovery.NodeType;
import org.apache.druid.guice.CacheModule;
@ -103,16 +101,12 @@ public class CliHistorical extends ServerRunnable
binder.install(new CacheModule());
MetricsModule.register(binder, CacheMonitor.class);
binder
.bind(DiscoverySideEffectsProvider.Child.class)
.toProvider(
new DiscoverySideEffectsProvider(
NodeType.HISTORICAL,
ImmutableList.of(DataNodeService.class, LookupNodeService.class)
)
)
.in(LazySingleton.class);
LifecycleModule.registerKey(binder, Key.get(DiscoverySideEffectsProvider.Child.class));
bindAnnouncer(
binder,
DiscoverySideEffectsProvider.builder(NodeType.HISTORICAL)
.serviceClasses(ImmutableList.of(LookupNodeService.class))
.build()
);
},
new LookupModule()
);

View File

@ -100,7 +100,7 @@ public class CliMiddleManager extends ServerRunnable
binder.bind(ForkingTaskRunner.class).in(LazySingleton.class);
binder.bind(IndexingServiceClient.class).toProvider(Providers.of(null));
binder.bind(new TypeLiteral<IndexTaskClientFactory<ParallelIndexTaskClient>>(){})
binder.bind(new TypeLiteral<IndexTaskClientFactory<ParallelIndexTaskClient>>() {})
.toProvider(Providers.of(null));
binder.bind(ChatHandlerProvider.class).toProvider(Providers.of(null));
PolyBind.createChoice(
@ -130,13 +130,12 @@ public class CliMiddleManager extends ServerRunnable
LifecycleModule.register(binder, Server.class);
binder
.bind(DiscoverySideEffectsProvider.Child.class)
.toProvider(
new DiscoverySideEffectsProvider(NodeType.MIDDLE_MANAGER, ImmutableList.of(WorkerNodeService.class))
)
.in(LazySingleton.class);
LifecycleModule.registerKey(binder, Key.get(DiscoverySideEffectsProvider.Child.class));
bindAnnouncer(
binder,
DiscoverySideEffectsProvider.builder(NodeType.MIDDLE_MANAGER)
.serviceClasses(ImmutableList.of(WorkerNodeService.class))
.build()
);
}
@Provides

View File

@ -194,7 +194,7 @@ public class CliOverlord extends ServerRunnable
binder.bind(SupervisorManager.class).in(LazySingleton.class);
binder.bind(IndexingServiceClient.class).to(HttpIndexingServiceClient.class).in(LazySingleton.class);
binder.bind(new TypeLiteral<IndexTaskClientFactory<ParallelIndexTaskClient>>(){})
binder.bind(new TypeLiteral<IndexTaskClientFactory<ParallelIndexTaskClient>>() {})
.toProvider(Providers.of(null));
binder.bind(ChatHandlerProvider.class).toProvider(Providers.of(null));
@ -237,12 +237,11 @@ public class CliOverlord extends ServerRunnable
LifecycleModule.register(binder, Server.class);
}
binder
.bind(DiscoverySideEffectsProvider.Child.class)
.annotatedWith(IndexingService.class)
.toProvider(new DiscoverySideEffectsProvider(NodeType.OVERLORD, ImmutableList.of()))
.in(LazySingleton.class);
LifecycleModule.registerKey(binder, Key.get(DiscoverySideEffectsProvider.Child.class, IndexingService.class));
bindAnnouncer(
binder,
IndexingService.class,
DiscoverySideEffectsProvider.builder(NodeType.OVERLORD).build()
);
}
private void configureTaskStorage(Binder binder)
@ -284,10 +283,14 @@ public class CliOverlord extends ServerRunnable
biddy.addBinding("local").to(ForkingTaskRunnerFactory.class);
binder.bind(ForkingTaskRunnerFactory.class).in(LazySingleton.class);
biddy.addBinding(RemoteTaskRunnerFactory.TYPE_NAME).to(RemoteTaskRunnerFactory.class).in(LazySingleton.class);
biddy.addBinding(RemoteTaskRunnerFactory.TYPE_NAME)
.to(RemoteTaskRunnerFactory.class)
.in(LazySingleton.class);
binder.bind(RemoteTaskRunnerFactory.class).in(LazySingleton.class);
biddy.addBinding(HttpRemoteTaskRunnerFactory.TYPE_NAME).to(HttpRemoteTaskRunnerFactory.class).in(LazySingleton.class);
biddy.addBinding(HttpRemoteTaskRunnerFactory.TYPE_NAME)
.to(HttpRemoteTaskRunnerFactory.class)
.in(LazySingleton.class);
binder.bind(HttpRemoteTaskRunnerFactory.class).in(LazySingleton.class);
JacksonConfigProvider.bind(binder, WorkerBehaviorConfig.CONFIG_KEY, WorkerBehaviorConfig.class, null);

View File

@ -21,7 +21,6 @@ package org.apache.druid.cli;
import com.google.common.collect.ImmutableList;
import com.google.inject.Binder;
import com.google.inject.Key;
import com.google.inject.Module;
import com.google.inject.TypeLiteral;
import com.google.inject.name.Names;
@ -114,11 +113,10 @@ public class CliRouter extends ServerRunnable
LifecycleModule.register(binder, Server.class);
DiscoveryModule.register(binder, Self.class);
binder
.bind(DiscoverySideEffectsProvider.Child.class)
.toProvider(new DiscoverySideEffectsProvider(NodeType.ROUTER, ImmutableList.of()))
.in(LazySingleton.class);
LifecycleModule.registerKey(binder, Key.get(DiscoverySideEffectsProvider.Child.class));
bindAnnouncer(
binder,
DiscoverySideEffectsProvider.builder(NodeType.ROUTER).build()
);
}
},
new LookupModule()

View File

@ -20,19 +20,26 @@
package org.apache.druid.cli;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.inject.Binder;
import com.google.inject.Inject;
import com.google.inject.Injector;
import com.google.inject.Key;
import com.google.inject.Provider;
import org.apache.druid.curator.discovery.ServiceAnnouncer;
import org.apache.druid.discovery.DiscoveryDruidNode;
import org.apache.druid.discovery.DruidNodeAnnouncer;
import org.apache.druid.discovery.DruidService;
import org.apache.druid.discovery.NodeType;
import org.apache.druid.guice.LazySingleton;
import org.apache.druid.guice.LifecycleModule;
import org.apache.druid.guice.annotations.Self;
import org.apache.druid.java.util.common.lifecycle.Lifecycle;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.server.DruidNode;
import java.lang.annotation.Annotation;
import java.util.List;
/**
@ -58,6 +65,32 @@ public abstract class ServerRunnable extends GuiceRunnable
}
}
public static void bindAnnouncer(
final Binder binder,
final DiscoverySideEffectsProvider provider
)
{
binder.bind(DiscoverySideEffectsProvider.Child.class)
.toProvider(provider)
.in(LazySingleton.class);
LifecycleModule.registerKey(binder, Key.get(DiscoverySideEffectsProvider.Child.class));
}
public static void bindAnnouncer(
final Binder binder,
final Class<? extends Annotation> annotation,
final DiscoverySideEffectsProvider provider
)
{
binder.bind(DiscoverySideEffectsProvider.Child.class)
.annotatedWith(annotation)
.toProvider(provider)
.in(LazySingleton.class);
LifecycleModule.registerKey(binder, Key.get(DiscoverySideEffectsProvider.Child.class, annotation));
}
/**
* This is a helper class used by CliXXX classes to announce {@link DiscoveryDruidNode}
* as part of {@link Lifecycle.Stage#LAST}.
@ -66,12 +99,50 @@ public abstract class ServerRunnable extends GuiceRunnable
{
public static class Child {}
@Inject @Self
public static class Builder
{
private NodeType nodeType;
private List<Class<? extends DruidService>> serviceClasses = ImmutableList.of();
private boolean useLegacyAnnouncer;
public Builder(final NodeType nodeType)
{
this.nodeType = nodeType;
}
public Builder serviceClasses(final List<Class<? extends DruidService>> serviceClasses)
{
this.serviceClasses = serviceClasses;
return this;
}
public Builder useLegacyAnnouncer(final boolean useLegacyAnnouncer)
{
this.useLegacyAnnouncer = useLegacyAnnouncer;
return this;
}
public DiscoverySideEffectsProvider build()
{
return new DiscoverySideEffectsProvider(nodeType, serviceClasses, useLegacyAnnouncer);
}
}
public static Builder builder(final NodeType nodeType)
{
return new Builder(nodeType);
}
@Inject
@Self
private DruidNode druidNode;
@Inject
private DruidNodeAnnouncer announcer;
@Inject
private ServiceAnnouncer legacyAnnouncer;
@Inject
private Lifecycle lifecycle;
@ -80,11 +151,17 @@ public abstract class ServerRunnable extends GuiceRunnable
private final NodeType nodeType;
private final List<Class<? extends DruidService>> serviceClasses;
private final boolean useLegacyAnnouncer;
public DiscoverySideEffectsProvider(NodeType nodeType, List<Class<? extends DruidService>> serviceClasses)
private DiscoverySideEffectsProvider(
final NodeType nodeType,
final List<Class<? extends DruidService>> serviceClasses,
final boolean useLegacyAnnouncer
)
{
this.nodeType = nodeType;
this.serviceClasses = serviceClasses;
this.useLegacyAnnouncer = useLegacyAnnouncer;
}
@Override
@ -105,11 +182,21 @@ public abstract class ServerRunnable extends GuiceRunnable
public void start()
{
announcer.announce(discoveryDruidNode);
if (useLegacyAnnouncer) {
legacyAnnouncer.announce(discoveryDruidNode.getDruidNode());
}
}
@Override
public void stop()
{
// Reverse order vs. start().
if (useLegacyAnnouncer) {
legacyAnnouncer.unannounce(discoveryDruidNode.getDruidNode());
}
announcer.unannounce(discoveryDruidNode);
}
},

View File

@ -60,6 +60,9 @@ public class PlannerConfig
@JsonProperty
private boolean requireTimeCondition = false;
@JsonProperty
private boolean awaitInitializationOnStart = true;
@JsonProperty
private DateTimeZone sqlTimeZone = DateTimeZone.UTC;
@ -113,6 +116,11 @@ public class PlannerConfig
return sqlTimeZone;
}
public boolean isAwaitInitializationOnStart()
{
return awaitInitializationOnStart;
}
public PlannerConfig withOverrides(final Map<String, Object> context)
{
if (context == null) {
@ -142,6 +150,7 @@ public class PlannerConfig
);
newConfig.requireTimeCondition = isRequireTimeCondition();
newConfig.sqlTimeZone = getSqlTimeZone();
newConfig.awaitInitializationOnStart = isAwaitInitializationOnStart();
return newConfig;
}
@ -181,6 +190,7 @@ public class PlannerConfig
useApproximateTopN == that.useApproximateTopN &&
useFallback == that.useFallback &&
requireTimeCondition == that.requireTimeCondition &&
awaitInitializationOnStart == that.awaitInitializationOnStart &&
Objects.equals(metadataRefreshPeriod, that.metadataRefreshPeriod) &&
Objects.equals(sqlTimeZone, that.sqlTimeZone);
}
@ -199,6 +209,7 @@ public class PlannerConfig
useApproximateTopN,
useFallback,
requireTimeCondition,
awaitInitializationOnStart,
sqlTimeZone
);
}
@ -216,6 +227,7 @@ public class PlannerConfig
", useApproximateTopN=" + useApproximateTopN +
", useFallback=" + useFallback +
", requireTimeCondition=" + requireTimeCondition +
", awaitInitializationOnStart=" + awaitInitializationOnStart +
", sqlTimeZone=" + sqlTimeZone +
'}';
}

View File

@ -19,7 +19,6 @@
package org.apache.druid.sql.calcite.schema;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableMultimap;
@ -98,7 +97,7 @@ public class DruidSchema extends AbstractSchema
private final ConcurrentMap<String, DruidTable> tables;
// For awaitInitialization.
private final CountDownLatch initializationLatch = new CountDownLatch(1);
private final CountDownLatch initialized = new CountDownLatch(1);
// Protects access to segmentSignatures, mutableSegments, segmentsNeedingRefresh, lastRefresh, isServerViewInitialized
private final Object lock = new Object();
@ -175,7 +174,7 @@ public class DruidSchema extends AbstractSchema
}
@LifecycleStart
public void start()
public void start() throws InterruptedException
{
cacheExec.submit(
new Runnable()
@ -254,7 +253,7 @@ public class DruidSchema extends AbstractSchema
}
}
initializationLatch.countDown();
initialized.countDown();
}
catch (InterruptedException e) {
// Fall through.
@ -288,6 +287,13 @@ public class DruidSchema extends AbstractSchema
}
}
);
if (config.isAwaitInitializationOnStart()) {
final long startMillis = System.currentTimeMillis();
log.info("%s waiting for initialization.", getClass().getSimpleName());
awaitInitialization();
log.info("%s initialized in [%,d] ms.", getClass().getSimpleName(), System.currentTimeMillis() - startMillis);
}
}
@LifecycleStop
@ -296,10 +302,9 @@ public class DruidSchema extends AbstractSchema
cacheExec.shutdownNow();
}
@VisibleForTesting
public void awaitInitialization() throws InterruptedException
{
initializationLatch.await();
initialized.await();
}
@Override

View File

@ -566,8 +566,8 @@ public class CalciteTests
TEST_AUTHENTICATOR_ESCALATOR
);
schema.start();
try {
schema.start();
schema.awaitInitialization();
}
catch (InterruptedException e) {