druid.coordinator.asOverlord.enabled flag at coordinator to make it an overlord too (#3711)

This commit is contained in:
Himanshu 2017-02-13 17:03:59 -06:00 committed by Fangjin Yang
parent 9ab9feced6
commit 8cf7ad1e3a
15 changed files with 214 additions and 25 deletions

View File

@ -34,6 +34,8 @@ The coordinator node uses several of the global configs in [Configuration](../co
|`druid.coordinator.kill.maxSegments`|Kill at most n segments per kill task submission, must be greater than 0. Only applies and MUST be specified if kill is turned on. Note that default value is invalid.|0| |`druid.coordinator.kill.maxSegments`|Kill at most n segments per kill task submission, must be greater than 0. Only applies and MUST be specified if kill is turned on. Note that default value is invalid.|0|
|`druid.coordinator.balancer.strategy`|Specify the type of balancing strategy that the coordinator should use to distribute segments among the historicals. Use `diskNormalized` to distribute segments among nodes so that the disks fill up uniformly and use `random` to randomly pick nodes to distribute segments.|`cost`| |`druid.coordinator.balancer.strategy`|Specify the type of balancing strategy that the coordinator should use to distribute segments among the historicals. Use `diskNormalized` to distribute segments among nodes so that the disks fill up uniformly and use `random` to randomly pick nodes to distribute segments.|`cost`|
|`druid.coordinator.loadqueuepeon.repeatDelay`|The start and repeat delay for the loadqueuepeon , which manages the load and drop of segments.|PT0.050S (50 ms)| |`druid.coordinator.loadqueuepeon.repeatDelay`|The start and repeat delay for the loadqueuepeon , which manages the load and drop of segments.|PT0.050S (50 ms)|
|`druid.coordinator.asOverlord.enabled`|Boolean value for whether this coordinator node should act like an overlord as well. This configuration allows users to simplify a druid cluster by not having to deploy any standalone overlord nodes. If set to true, then be sure to set `druid.coordinator.asOverlord.overlordService` also. See next.|false|
|`druid.coordinator.asOverlord.overlordService`| Required, if `druid.coordinator.asOverlord.enabled` is `true`. This must be same value as `druid.service` on standalone Overlord nodes and `druid.selectors.indexing.serviceName` on Middle Managers.|NULL|
### Metadata Retrieval ### Metadata Retrieval

View File

@ -37,6 +37,7 @@ import io.druid.java.util.common.lifecycle.Lifecycle;
import io.druid.java.util.common.lifecycle.LifecycleStart; import io.druid.java.util.common.lifecycle.LifecycleStart;
import io.druid.java.util.common.lifecycle.LifecycleStop; import io.druid.java.util.common.lifecycle.LifecycleStop;
import io.druid.server.DruidNode; import io.druid.server.DruidNode;
import io.druid.server.coordinator.CoordinatorOverlordServiceConfig;
import io.druid.server.initialization.IndexerZkConfig; import io.druid.server.initialization.IndexerZkConfig;
import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.leader.LeaderSelector; import org.apache.curator.framework.recipes.leader.LeaderSelector;
@ -73,11 +74,12 @@ public class TaskMaster
final TaskLockbox taskLockbox, final TaskLockbox taskLockbox,
final TaskStorage taskStorage, final TaskStorage taskStorage,
final TaskActionClientFactory taskActionClientFactory, final TaskActionClientFactory taskActionClientFactory,
@Self final DruidNode node, @Self final DruidNode selfNode,
final IndexerZkConfig zkPaths, final IndexerZkConfig zkPaths,
final TaskRunnerFactory runnerFactory, final TaskRunnerFactory runnerFactory,
final CuratorFramework curator, final CuratorFramework curator,
final ServiceAnnouncer serviceAnnouncer, final ServiceAnnouncer serviceAnnouncer,
final CoordinatorOverlordServiceConfig coordinatorOverlordServiceConfig,
final ServiceEmitter emitter, final ServiceEmitter emitter,
final SupervisorManager supervisorManager, final SupervisorManager supervisorManager,
final OverlordHelperManager overlordHelperManager final OverlordHelperManager overlordHelperManager
@ -85,6 +87,10 @@ public class TaskMaster
{ {
this.supervisorManager = supervisorManager; this.supervisorManager = supervisorManager;
this.taskActionClientFactory = taskActionClientFactory; this.taskActionClientFactory = taskActionClientFactory;
final DruidNode node = coordinatorOverlordServiceConfig.getOverlordService() == null ? selfNode :
selfNode.withService(coordinatorOverlordServiceConfig.getOverlordService());
this.leaderSelector = new LeaderSelector( this.leaderSelector = new LeaderSelector(
curator, curator,
zkPaths.getLeaderLatchPath(), zkPaths.getLeaderLatchPath(),

View File

@ -40,7 +40,7 @@ public class OverlordRedirectInfo implements RedirectInfo
} }
@Override @Override
public boolean doLocal() public boolean doLocal(String requestURI)
{ {
return taskMaster.isLeading(); return taskMaster.isLeading();
} }

View File

@ -44,7 +44,7 @@ public class OverlordRedirectInfoTest
{ {
EasyMock.expect(taskMaster.isLeading()).andReturn(true).anyTimes(); EasyMock.expect(taskMaster.isLeading()).andReturn(true).anyTimes();
EasyMock.replay(taskMaster); EasyMock.replay(taskMaster);
Assert.assertTrue(redirectInfo.doLocal()); Assert.assertTrue(redirectInfo.doLocal(null));
EasyMock.verify(taskMaster); EasyMock.verify(taskMaster);
} }

View File

@ -53,6 +53,7 @@ import io.druid.indexing.overlord.supervisor.SupervisorManager;
import io.druid.java.util.common.Pair; import io.druid.java.util.common.Pair;
import io.druid.java.util.common.guava.CloseQuietly; import io.druid.java.util.common.guava.CloseQuietly;
import io.druid.server.DruidNode; import io.druid.server.DruidNode;
import io.druid.server.coordinator.CoordinatorOverlordServiceConfig;
import io.druid.server.initialization.IndexerZkConfig; import io.druid.server.initialization.IndexerZkConfig;
import io.druid.server.initialization.ZkPathsConfig; import io.druid.server.initialization.ZkPathsConfig;
import io.druid.server.metrics.NoopServiceEmitter; import io.druid.server.metrics.NoopServiceEmitter;
@ -183,6 +184,7 @@ public class OverlordTest
announcementLatch.countDown(); announcementLatch.countDown();
} }
}, },
new CoordinatorOverlordServiceConfig(null, null),
serviceEmitter, serviceEmitter,
supervisorManager, supervisorManager,
EasyMock.createNiceMock(OverlordHelperManager.class) EasyMock.createNiceMock(OverlordHelperManager.class)

View File

@ -129,6 +129,11 @@ public class DruidNode
return port; return port;
} }
public DruidNode withService(String service)
{
return new DruidNode(service, host, port);
}
/** /**
* Returns host and port together as something that can be used as part of a URI. * Returns host and port together as something that can be used as part of a URI.
*/ */

View File

@ -0,0 +1,56 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.server.coordinator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
/**
*/
public class CoordinatorOverlordServiceConfig
{
@JsonProperty
private final boolean enabled;
@JsonProperty
private final String overlordService;
public CoordinatorOverlordServiceConfig(
@JsonProperty("enabled") Boolean enabled,
@JsonProperty("overlordService") String overlordService
)
{
this.enabled = enabled == null ? false : enabled.booleanValue();
this.overlordService = overlordService;
Preconditions.checkArgument((this.enabled && this.overlordService != null) || !this.enabled,
"coordinator is enabled to be overlord but overlordService is not specified");
}
public boolean isEnabled()
{
return enabled;
}
public String getOverlordService()
{
return overlordService;
}
}

View File

@ -37,7 +37,7 @@ public class CoordinatorRedirectInfo implements RedirectInfo
} }
@Override @Override
public boolean doLocal() public boolean doLocal(String requestURI)
{ {
return coordinator.isLeader(); return coordinator.isLeader();
} }

View File

@ -68,7 +68,7 @@ public class RedirectFilter implements Filter
throw new ServletException("non-HTTP request or response"); throw new ServletException("non-HTTP request or response");
} }
if (redirectInfo.doLocal()) { if (redirectInfo.doLocal(request.getRequestURI())) {
chain.doFilter(request, response); chain.doFilter(request, response);
} else { } else {
URL url = redirectInfo.getRedirectURL(request.getQueryString(), request.getRequestURI()); URL url = redirectInfo.getRedirectURL(request.getQueryString(), request.getRequestURI());

View File

@ -25,7 +25,7 @@ import java.net.URL;
*/ */
public interface RedirectInfo public interface RedirectInfo
{ {
public boolean doLocal(); public boolean doLocal(String requestURI);
public URL getRedirectURL(String queryString, String requestURI); public URL getRedirectURL(String queryString, String requestURI);
} }

View File

@ -44,7 +44,7 @@ public class CoordinatorRedirectInfoTest
{ {
EasyMock.expect(druidCoordinator.isLeader()).andReturn(true).anyTimes(); EasyMock.expect(druidCoordinator.isLeader()).andReturn(true).anyTimes();
EasyMock.replay(druidCoordinator); EasyMock.replay(druidCoordinator);
Assert.assertTrue(coordinatorRedirectInfo.doLocal()); Assert.assertTrue(coordinatorRedirectInfo.doLocal(null));
EasyMock.verify(druidCoordinator); EasyMock.verify(druidCoordinator);
} }

View File

@ -21,7 +21,6 @@ package io.druid.cli;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Predicates; import com.google.common.base.Predicates;
import com.google.common.collect.ImmutableList;
import com.google.inject.Binder; import com.google.inject.Binder;
import com.google.inject.Inject; import com.google.inject.Inject;
import com.google.inject.Module; import com.google.inject.Module;
@ -79,6 +78,7 @@ import io.druid.server.router.TieredBrokerConfig;
import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFramework;
import org.eclipse.jetty.server.Server; import org.eclipse.jetty.server.Server;
import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Properties; import java.util.Properties;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
@ -94,6 +94,7 @@ public class CliCoordinator extends ServerRunnable
private static final Logger log = new Logger(CliCoordinator.class); private static final Logger log = new Logger(CliCoordinator.class);
private Properties properties; private Properties properties;
private boolean beOverlord;
public CliCoordinator() public CliCoordinator()
{ {
@ -104,12 +105,19 @@ public class CliCoordinator extends ServerRunnable
public void configure(Properties properties) public void configure(Properties properties)
{ {
this.properties = properties; this.properties = properties;
beOverlord = isOverlord(properties);
if (beOverlord) {
log.info("Coordinator is configured to act as Overlord as well.");
}
} }
@Override @Override
protected List<? extends Module> getModules() protected List<? extends Module> getModules()
{ {
return ImmutableList.<Module>of( List<Module> modules = new ArrayList<>();
modules.add(
new Module() new Module()
{ {
@Override @Override
@ -131,7 +139,11 @@ public class CliCoordinator extends ServerRunnable
JsonConfigProvider.bind(binder, "druid.coordinator.balancer", BalancerStrategyFactory.class); JsonConfigProvider.bind(binder, "druid.coordinator.balancer", BalancerStrategyFactory.class);
binder.bind(RedirectFilter.class).in(LazySingleton.class); binder.bind(RedirectFilter.class).in(LazySingleton.class);
binder.bind(RedirectInfo.class).to(CoordinatorRedirectInfo.class).in(LazySingleton.class); if (beOverlord) {
binder.bind(RedirectInfo.class).to(CoordinatorOverlordRedirectInfo.class).in(LazySingleton.class);
} else {
binder.bind(RedirectInfo.class).to(CoordinatorRedirectInfo.class).in(LazySingleton.class);
}
binder.bind(MetadataSegmentManager.class) binder.bind(MetadataSegmentManager.class)
.toProvider(MetadataSegmentManagerProvider.class) .toProvider(MetadataSegmentManagerProvider.class)
@ -192,7 +204,6 @@ public class CliCoordinator extends ServerRunnable
Predicates.equalTo("true"), Predicates.equalTo("true"),
DruidCoordinatorSegmentKiller.class DruidCoordinatorSegmentKiller.class
); );
} }
@Provides @Provides
@ -211,5 +222,16 @@ public class CliCoordinator extends ServerRunnable
} }
} }
); );
if (beOverlord) {
modules.addAll(new CliOverlord().getModules(false));
}
return modules;
}
public static boolean isOverlord(Properties properties)
{
return Boolean.valueOf(properties.getProperty("druid.coordinator.asOverlord.enabled")).booleanValue();
} }
} }

View File

@ -79,6 +79,7 @@ import io.druid.indexing.worker.config.WorkerConfig;
import io.druid.java.util.common.logger.Logger; import io.druid.java.util.common.logger.Logger;
import io.druid.segment.realtime.firehose.ChatHandlerProvider; import io.druid.segment.realtime.firehose.ChatHandlerProvider;
import io.druid.server.audit.AuditManagerProvider; import io.druid.server.audit.AuditManagerProvider;
import io.druid.server.coordinator.CoordinatorOverlordServiceConfig;
import io.druid.server.http.RedirectFilter; import io.druid.server.http.RedirectFilter;
import io.druid.server.http.RedirectInfo; import io.druid.server.http.RedirectInfo;
import io.druid.server.initialization.jetty.JettyServerInitUtils; import io.druid.server.initialization.jetty.JettyServerInitUtils;
@ -113,6 +114,11 @@ public class CliOverlord extends ServerRunnable
@Override @Override
protected List<? extends Module> getModules() protected List<? extends Module> getModules()
{
return getModules(true);
}
protected List<? extends Module> getModules(final boolean standalone)
{ {
return ImmutableList.<Module>of( return ImmutableList.<Module>of(
new Module() new Module()
@ -120,11 +126,14 @@ public class CliOverlord extends ServerRunnable
@Override @Override
public void configure(Binder binder) public void configure(Binder binder)
{ {
binder.bindConstant() if (standalone) {
.annotatedWith(Names.named("serviceName")) binder.bindConstant()
.to(IndexingServiceSelectorConfig.DEFAULT_SERVICE_NAME); .annotatedWith(Names.named("serviceName"))
binder.bindConstant().annotatedWith(Names.named("servicePort")).to(8090); .to(IndexingServiceSelectorConfig.DEFAULT_SERVICE_NAME);
binder.bindConstant().annotatedWith(Names.named("servicePort")).to(8090);
}
JsonConfigProvider.bind(binder, "druid.coordinator.asOverlord", CoordinatorOverlordServiceConfig.class);
JsonConfigProvider.bind(binder, "druid.indexer.queue", TaskQueueConfig.class); JsonConfigProvider.bind(binder, "druid.indexer.queue", TaskQueueConfig.class);
JsonConfigProvider.bind(binder, "druid.indexer.task", TaskConfig.class); JsonConfigProvider.bind(binder, "druid.indexer.task", TaskConfig.class);
@ -160,14 +169,18 @@ public class CliOverlord extends ServerRunnable
.toProvider(AuditManagerProvider.class) .toProvider(AuditManagerProvider.class)
.in(ManageLifecycle.class); .in(ManageLifecycle.class);
binder.bind(RedirectFilter.class).in(LazySingleton.class); if (standalone) {
binder.bind(RedirectInfo.class).to(OverlordRedirectInfo.class).in(LazySingleton.class); binder.bind(RedirectFilter.class).in(LazySingleton.class);
binder.bind(RedirectInfo.class).to(OverlordRedirectInfo.class).in(LazySingleton.class);
binder.bind(JettyServerInitializer.class).toInstance(new OverlordJettyServerInitializer());
}
binder.bind(JettyServerInitializer.class).toInstance(new OverlordJettyServerInitializer());
Jerseys.addResource(binder, OverlordResource.class); Jerseys.addResource(binder, OverlordResource.class);
Jerseys.addResource(binder, SupervisorResource.class); Jerseys.addResource(binder, SupervisorResource.class);
LifecycleModule.register(binder, Server.class); if (standalone) {
LifecycleModule.register(binder, Server.class);
}
} }
private void configureTaskStorage(Binder binder) private void configureTaskStorage(Binder binder)

View File

@ -37,16 +37,20 @@ import org.eclipse.jetty.servlet.ServletHolder;
import org.eclipse.jetty.util.resource.Resource; import org.eclipse.jetty.util.resource.Resource;
import org.eclipse.jetty.util.resource.ResourceCollection; import org.eclipse.jetty.util.resource.ResourceCollection;
import java.util.Properties;
/** /**
*/ */
class CoordinatorJettyServerInitializer implements JettyServerInitializer class CoordinatorJettyServerInitializer implements JettyServerInitializer
{ {
private final DruidCoordinatorConfig config; private final DruidCoordinatorConfig config;
private final boolean beOverlord;
@Inject @Inject
CoordinatorJettyServerInitializer(DruidCoordinatorConfig config) CoordinatorJettyServerInitializer(DruidCoordinatorConfig config, Properties properties)
{ {
this.config = config; this.config = config;
this.beOverlord = CliCoordinator.isOverlord(properties);
} }
@Override @Override
@ -59,10 +63,19 @@ class CoordinatorJettyServerInitializer implements JettyServerInitializer
root.addServlet(holderPwd, "/"); root.addServlet(holderPwd, "/");
if(config.getConsoleStatic() == null) { if(config.getConsoleStatic() == null) {
ResourceCollection staticResources = new ResourceCollection( ResourceCollection staticResources;
Resource.newClassPathResource("io/druid/console"), if (beOverlord) {
Resource.newClassPathResource("static") staticResources = new ResourceCollection(
); Resource.newClassPathResource("io/druid/console"),
Resource.newClassPathResource("static"),
Resource.newClassPathResource("indexer_static")
);
} else {
staticResources = new ResourceCollection(
Resource.newClassPathResource("io/druid/console"),
Resource.newClassPathResource("static")
);
}
root.setBaseResource(staticResources); root.setBaseResource(staticResources);
} else { } else {
// used for console development // used for console development
@ -81,10 +94,15 @@ class CoordinatorJettyServerInitializer implements JettyServerInitializer
// Can't use '/*' here because of Guice and Jetty static content conflicts // Can't use '/*' here because of Guice and Jetty static content conflicts
root.addFilter(GuiceFilter.class, "/info/*", null); root.addFilter(GuiceFilter.class, "/info/*", null);
root.addFilter(GuiceFilter.class, "/druid/coordinator/*", null); root.addFilter(GuiceFilter.class, "/druid/coordinator/*", null);
if (beOverlord) {
root.addFilter(GuiceFilter.class, "/druid/indexer/*", null);
}
// this will be removed in the next major release // this will be removed in the next major release
root.addFilter(GuiceFilter.class, "/coordinator/*", null); root.addFilter(GuiceFilter.class, "/coordinator/*", null);
root.addServlet(new ServletHolder(injector.getInstance(OverlordProxyServlet.class)), "/druid/indexer/*"); if (!beOverlord) {
root.addServlet(new ServletHolder(injector.getInstance(OverlordProxyServlet.class)), "/druid/indexer/*");
}
HandlerList handlerList = new HandlerList(); HandlerList handlerList = new HandlerList();
handlerList.setHandlers(new Handler[]{JettyServerInitUtils.getJettyRequestLogHandler(), root}); handlerList.setHandlers(new Handler[]{JettyServerInitUtils.getJettyRequestLogHandler(), root});

View File

@ -0,0 +1,65 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.cli;
import com.google.inject.Inject;
import io.druid.indexing.overlord.TaskMaster;
import io.druid.indexing.overlord.http.OverlordRedirectInfo;
import io.druid.server.coordinator.DruidCoordinator;
import io.druid.server.http.CoordinatorRedirectInfo;
import io.druid.server.http.RedirectInfo;
import java.net.URL;
/**
*/
public class CoordinatorOverlordRedirectInfo implements RedirectInfo
{
private final OverlordRedirectInfo overlordRedirectInfo;
private final CoordinatorRedirectInfo coordinatorRedirectInfo;
@Inject
public CoordinatorOverlordRedirectInfo(TaskMaster taskMaster, DruidCoordinator druidCoordinator)
{
this.overlordRedirectInfo = new OverlordRedirectInfo(taskMaster);
this.coordinatorRedirectInfo = new CoordinatorRedirectInfo(druidCoordinator);
}
@Override
public boolean doLocal(String requestURI)
{
return isOverlordRequest(requestURI) ?
overlordRedirectInfo.doLocal(requestURI) :
coordinatorRedirectInfo.doLocal(requestURI);
}
@Override
public URL getRedirectURL(String queryString, String requestURI)
{
return isOverlordRequest(requestURI) ?
overlordRedirectInfo.getRedirectURL(queryString, requestURI) :
coordinatorRedirectInfo.getRedirectURL(queryString, requestURI);
}
private boolean isOverlordRequest(String requestURI)
{
return requestURI.startsWith("/druid/indexer");
}
}