diff --git a/docs/content/configuration/coordinator.md b/docs/content/configuration/coordinator.md index b4e4f0c8f79..397fcfc2d51 100644 --- a/docs/content/configuration/coordinator.md +++ b/docs/content/configuration/coordinator.md @@ -34,6 +34,8 @@ The coordinator node uses several of the global configs in [Configuration](../co |`druid.coordinator.kill.maxSegments`|Kill at most n segments per kill task submission, must be greater than 0. Only applies and MUST be specified if kill is turned on. Note that default value is invalid.|0| |`druid.coordinator.balancer.strategy`|Specify the type of balancing strategy that the coordinator should use to distribute segments among the historicals. Use `diskNormalized` to distribute segments among nodes so that the disks fill up uniformly and use `random` to randomly pick nodes to distribute segments.|`cost`| |`druid.coordinator.loadqueuepeon.repeatDelay`|The start and repeat delay for the loadqueuepeon , which manages the load and drop of segments.|PT0.050S (50 ms)| +|`druid.coordinator.asOverlord.enabled`|Boolean value for whether this coordinator node should act like an overlord as well. This configuration allows users to simplify a druid cluster by not having to deploy any standalone overlord nodes. If set to true, then be sure to set `druid.coordinator.asOverlord.overlordService` also. See next.|false| +|`druid.coordinator.asOverlord.overlordService`| Required, if `druid.coordinator.asOverlord.enabled` is `true`. This must be same value as `druid.service` on standalone Overlord nodes and `druid.selectors.indexing.serviceName` on Middle Managers.|NULL| ### Metadata Retrieval diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/TaskMaster.java b/indexing-service/src/main/java/io/druid/indexing/overlord/TaskMaster.java index 93ced507882..e70a2485245 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/TaskMaster.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/TaskMaster.java @@ -37,6 +37,7 @@ import io.druid.java.util.common.lifecycle.Lifecycle; import io.druid.java.util.common.lifecycle.LifecycleStart; import io.druid.java.util.common.lifecycle.LifecycleStop; import io.druid.server.DruidNode; +import io.druid.server.coordinator.CoordinatorOverlordServiceConfig; import io.druid.server.initialization.IndexerZkConfig; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.recipes.leader.LeaderSelector; @@ -73,11 +74,12 @@ public class TaskMaster final TaskLockbox taskLockbox, final TaskStorage taskStorage, final TaskActionClientFactory taskActionClientFactory, - @Self final DruidNode node, + @Self final DruidNode selfNode, final IndexerZkConfig zkPaths, final TaskRunnerFactory runnerFactory, final CuratorFramework curator, final ServiceAnnouncer serviceAnnouncer, + final CoordinatorOverlordServiceConfig coordinatorOverlordServiceConfig, final ServiceEmitter emitter, final SupervisorManager supervisorManager, final OverlordHelperManager overlordHelperManager @@ -85,6 +87,10 @@ public class TaskMaster { this.supervisorManager = supervisorManager; this.taskActionClientFactory = taskActionClientFactory; + + final DruidNode node = coordinatorOverlordServiceConfig.getOverlordService() == null ? selfNode : + selfNode.withService(coordinatorOverlordServiceConfig.getOverlordService()); + this.leaderSelector = new LeaderSelector( curator, zkPaths.getLeaderLatchPath(), diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/http/OverlordRedirectInfo.java b/indexing-service/src/main/java/io/druid/indexing/overlord/http/OverlordRedirectInfo.java index 86b4194ad9a..d02e294fed4 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/http/OverlordRedirectInfo.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/http/OverlordRedirectInfo.java @@ -40,7 +40,7 @@ public class OverlordRedirectInfo implements RedirectInfo } @Override - public boolean doLocal() + public boolean doLocal(String requestURI) { return taskMaster.isLeading(); } diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/http/OverlordRedirectInfoTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/http/OverlordRedirectInfoTest.java index d85a9d11fde..42cfb12de39 100644 --- a/indexing-service/src/test/java/io/druid/indexing/overlord/http/OverlordRedirectInfoTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/http/OverlordRedirectInfoTest.java @@ -44,7 +44,7 @@ public class OverlordRedirectInfoTest { EasyMock.expect(taskMaster.isLeading()).andReturn(true).anyTimes(); EasyMock.replay(taskMaster); - Assert.assertTrue(redirectInfo.doLocal()); + Assert.assertTrue(redirectInfo.doLocal(null)); EasyMock.verify(taskMaster); } diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/http/OverlordTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/http/OverlordTest.java index 7ac6c94a16d..aef09f121ca 100644 --- a/indexing-service/src/test/java/io/druid/indexing/overlord/http/OverlordTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/http/OverlordTest.java @@ -53,6 +53,7 @@ import io.druid.indexing.overlord.supervisor.SupervisorManager; import io.druid.java.util.common.Pair; import io.druid.java.util.common.guava.CloseQuietly; import io.druid.server.DruidNode; +import io.druid.server.coordinator.CoordinatorOverlordServiceConfig; import io.druid.server.initialization.IndexerZkConfig; import io.druid.server.initialization.ZkPathsConfig; import io.druid.server.metrics.NoopServiceEmitter; @@ -183,6 +184,7 @@ public class OverlordTest announcementLatch.countDown(); } }, + new CoordinatorOverlordServiceConfig(null, null), serviceEmitter, supervisorManager, EasyMock.createNiceMock(OverlordHelperManager.class) diff --git a/server/src/main/java/io/druid/server/DruidNode.java b/server/src/main/java/io/druid/server/DruidNode.java index 13cb2c17662..7dca62d832d 100644 --- a/server/src/main/java/io/druid/server/DruidNode.java +++ b/server/src/main/java/io/druid/server/DruidNode.java @@ -129,6 +129,11 @@ public class DruidNode return port; } + public DruidNode withService(String service) + { + return new DruidNode(service, host, port); + } + /** * Returns host and port together as something that can be used as part of a URI. */ diff --git a/server/src/main/java/io/druid/server/coordinator/CoordinatorOverlordServiceConfig.java b/server/src/main/java/io/druid/server/coordinator/CoordinatorOverlordServiceConfig.java new file mode 100644 index 00000000000..c7742be0096 --- /dev/null +++ b/server/src/main/java/io/druid/server/coordinator/CoordinatorOverlordServiceConfig.java @@ -0,0 +1,56 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.server.coordinator; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Preconditions; + +/** + */ +public class CoordinatorOverlordServiceConfig +{ + @JsonProperty + private final boolean enabled; + + @JsonProperty + private final String overlordService; + + public CoordinatorOverlordServiceConfig( + @JsonProperty("enabled") Boolean enabled, + @JsonProperty("overlordService") String overlordService + ) + { + this.enabled = enabled == null ? false : enabled.booleanValue(); + this.overlordService = overlordService; + + Preconditions.checkArgument((this.enabled && this.overlordService != null) || !this.enabled, + "coordinator is enabled to be overlord but overlordService is not specified"); + } + + public boolean isEnabled() + { + return enabled; + } + + public String getOverlordService() + { + return overlordService; + } +} diff --git a/server/src/main/java/io/druid/server/http/CoordinatorRedirectInfo.java b/server/src/main/java/io/druid/server/http/CoordinatorRedirectInfo.java index 73609cd5f00..3d2f46d0a15 100644 --- a/server/src/main/java/io/druid/server/http/CoordinatorRedirectInfo.java +++ b/server/src/main/java/io/druid/server/http/CoordinatorRedirectInfo.java @@ -37,7 +37,7 @@ public class CoordinatorRedirectInfo implements RedirectInfo } @Override - public boolean doLocal() + public boolean doLocal(String requestURI) { return coordinator.isLeader(); } diff --git a/server/src/main/java/io/druid/server/http/RedirectFilter.java b/server/src/main/java/io/druid/server/http/RedirectFilter.java index 6f37694d52e..71a518af9d4 100644 --- a/server/src/main/java/io/druid/server/http/RedirectFilter.java +++ b/server/src/main/java/io/druid/server/http/RedirectFilter.java @@ -68,7 +68,7 @@ public class RedirectFilter implements Filter throw new ServletException("non-HTTP request or response"); } - if (redirectInfo.doLocal()) { + if (redirectInfo.doLocal(request.getRequestURI())) { chain.doFilter(request, response); } else { URL url = redirectInfo.getRedirectURL(request.getQueryString(), request.getRequestURI()); diff --git a/server/src/main/java/io/druid/server/http/RedirectInfo.java b/server/src/main/java/io/druid/server/http/RedirectInfo.java index f42c32e5233..2c57fde4059 100644 --- a/server/src/main/java/io/druid/server/http/RedirectInfo.java +++ b/server/src/main/java/io/druid/server/http/RedirectInfo.java @@ -25,7 +25,7 @@ import java.net.URL; */ public interface RedirectInfo { - public boolean doLocal(); + public boolean doLocal(String requestURI); public URL getRedirectURL(String queryString, String requestURI); } diff --git a/server/src/test/java/io/druid/server/http/CoordinatorRedirectInfoTest.java b/server/src/test/java/io/druid/server/http/CoordinatorRedirectInfoTest.java index 274497a798e..1bec282bcdc 100644 --- a/server/src/test/java/io/druid/server/http/CoordinatorRedirectInfoTest.java +++ b/server/src/test/java/io/druid/server/http/CoordinatorRedirectInfoTest.java @@ -44,7 +44,7 @@ public class CoordinatorRedirectInfoTest { EasyMock.expect(druidCoordinator.isLeader()).andReturn(true).anyTimes(); EasyMock.replay(druidCoordinator); - Assert.assertTrue(coordinatorRedirectInfo.doLocal()); + Assert.assertTrue(coordinatorRedirectInfo.doLocal(null)); EasyMock.verify(druidCoordinator); } diff --git a/services/src/main/java/io/druid/cli/CliCoordinator.java b/services/src/main/java/io/druid/cli/CliCoordinator.java index 98386a635cd..9a6b5ac4e87 100644 --- a/services/src/main/java/io/druid/cli/CliCoordinator.java +++ b/services/src/main/java/io/druid/cli/CliCoordinator.java @@ -21,7 +21,6 @@ package io.druid.cli; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Predicates; -import com.google.common.collect.ImmutableList; import com.google.inject.Binder; import com.google.inject.Inject; import com.google.inject.Module; @@ -79,6 +78,7 @@ import io.druid.server.router.TieredBrokerConfig; import org.apache.curator.framework.CuratorFramework; import org.eclipse.jetty.server.Server; +import java.util.ArrayList; import java.util.List; import java.util.Properties; import java.util.concurrent.Executors; @@ -94,6 +94,7 @@ public class CliCoordinator extends ServerRunnable private static final Logger log = new Logger(CliCoordinator.class); private Properties properties; + private boolean beOverlord; public CliCoordinator() { @@ -104,12 +105,19 @@ public class CliCoordinator extends ServerRunnable public void configure(Properties properties) { this.properties = properties; + beOverlord = isOverlord(properties); + + if (beOverlord) { + log.info("Coordinator is configured to act as Overlord as well."); + } } @Override protected List getModules() { - return ImmutableList.of( + List modules = new ArrayList<>(); + + modules.add( new Module() { @Override @@ -131,7 +139,11 @@ public class CliCoordinator extends ServerRunnable JsonConfigProvider.bind(binder, "druid.coordinator.balancer", BalancerStrategyFactory.class); binder.bind(RedirectFilter.class).in(LazySingleton.class); - binder.bind(RedirectInfo.class).to(CoordinatorRedirectInfo.class).in(LazySingleton.class); + if (beOverlord) { + binder.bind(RedirectInfo.class).to(CoordinatorOverlordRedirectInfo.class).in(LazySingleton.class); + } else { + binder.bind(RedirectInfo.class).to(CoordinatorRedirectInfo.class).in(LazySingleton.class); + } binder.bind(MetadataSegmentManager.class) .toProvider(MetadataSegmentManagerProvider.class) @@ -192,7 +204,6 @@ public class CliCoordinator extends ServerRunnable Predicates.equalTo("true"), DruidCoordinatorSegmentKiller.class ); - } @Provides @@ -211,5 +222,16 @@ public class CliCoordinator extends ServerRunnable } } ); + + if (beOverlord) { + modules.addAll(new CliOverlord().getModules(false)); + } + + return modules; + } + + public static boolean isOverlord(Properties properties) + { + return Boolean.valueOf(properties.getProperty("druid.coordinator.asOverlord.enabled")).booleanValue(); } } diff --git a/services/src/main/java/io/druid/cli/CliOverlord.java b/services/src/main/java/io/druid/cli/CliOverlord.java index 4e9b822607b..390f6c441a7 100644 --- a/services/src/main/java/io/druid/cli/CliOverlord.java +++ b/services/src/main/java/io/druid/cli/CliOverlord.java @@ -79,6 +79,7 @@ import io.druid.indexing.worker.config.WorkerConfig; import io.druid.java.util.common.logger.Logger; import io.druid.segment.realtime.firehose.ChatHandlerProvider; import io.druid.server.audit.AuditManagerProvider; +import io.druid.server.coordinator.CoordinatorOverlordServiceConfig; import io.druid.server.http.RedirectFilter; import io.druid.server.http.RedirectInfo; import io.druid.server.initialization.jetty.JettyServerInitUtils; @@ -113,6 +114,11 @@ public class CliOverlord extends ServerRunnable @Override protected List getModules() + { + return getModules(true); + } + + protected List getModules(final boolean standalone) { return ImmutableList.of( new Module() @@ -120,11 +126,14 @@ public class CliOverlord extends ServerRunnable @Override public void configure(Binder binder) { - binder.bindConstant() - .annotatedWith(Names.named("serviceName")) - .to(IndexingServiceSelectorConfig.DEFAULT_SERVICE_NAME); - binder.bindConstant().annotatedWith(Names.named("servicePort")).to(8090); + if (standalone) { + binder.bindConstant() + .annotatedWith(Names.named("serviceName")) + .to(IndexingServiceSelectorConfig.DEFAULT_SERVICE_NAME); + binder.bindConstant().annotatedWith(Names.named("servicePort")).to(8090); + } + JsonConfigProvider.bind(binder, "druid.coordinator.asOverlord", CoordinatorOverlordServiceConfig.class); JsonConfigProvider.bind(binder, "druid.indexer.queue", TaskQueueConfig.class); JsonConfigProvider.bind(binder, "druid.indexer.task", TaskConfig.class); @@ -160,14 +169,18 @@ public class CliOverlord extends ServerRunnable .toProvider(AuditManagerProvider.class) .in(ManageLifecycle.class); - binder.bind(RedirectFilter.class).in(LazySingleton.class); - binder.bind(RedirectInfo.class).to(OverlordRedirectInfo.class).in(LazySingleton.class); + if (standalone) { + binder.bind(RedirectFilter.class).in(LazySingleton.class); + binder.bind(RedirectInfo.class).to(OverlordRedirectInfo.class).in(LazySingleton.class); + binder.bind(JettyServerInitializer.class).toInstance(new OverlordJettyServerInitializer()); + } - binder.bind(JettyServerInitializer.class).toInstance(new OverlordJettyServerInitializer()); Jerseys.addResource(binder, OverlordResource.class); Jerseys.addResource(binder, SupervisorResource.class); - LifecycleModule.register(binder, Server.class); + if (standalone) { + LifecycleModule.register(binder, Server.class); + } } private void configureTaskStorage(Binder binder) diff --git a/services/src/main/java/io/druid/cli/CoordinatorJettyServerInitializer.java b/services/src/main/java/io/druid/cli/CoordinatorJettyServerInitializer.java index f4a70b5f0cc..4077e4ce97e 100644 --- a/services/src/main/java/io/druid/cli/CoordinatorJettyServerInitializer.java +++ b/services/src/main/java/io/druid/cli/CoordinatorJettyServerInitializer.java @@ -37,16 +37,20 @@ import org.eclipse.jetty.servlet.ServletHolder; import org.eclipse.jetty.util.resource.Resource; import org.eclipse.jetty.util.resource.ResourceCollection; +import java.util.Properties; + /** */ class CoordinatorJettyServerInitializer implements JettyServerInitializer { private final DruidCoordinatorConfig config; + private final boolean beOverlord; @Inject - CoordinatorJettyServerInitializer(DruidCoordinatorConfig config) + CoordinatorJettyServerInitializer(DruidCoordinatorConfig config, Properties properties) { this.config = config; + this.beOverlord = CliCoordinator.isOverlord(properties); } @Override @@ -59,10 +63,19 @@ class CoordinatorJettyServerInitializer implements JettyServerInitializer root.addServlet(holderPwd, "/"); if(config.getConsoleStatic() == null) { - ResourceCollection staticResources = new ResourceCollection( - Resource.newClassPathResource("io/druid/console"), - Resource.newClassPathResource("static") - ); + ResourceCollection staticResources; + if (beOverlord) { + staticResources = new ResourceCollection( + Resource.newClassPathResource("io/druid/console"), + Resource.newClassPathResource("static"), + Resource.newClassPathResource("indexer_static") + ); + } else { + staticResources = new ResourceCollection( + Resource.newClassPathResource("io/druid/console"), + Resource.newClassPathResource("static") + ); + } root.setBaseResource(staticResources); } else { // used for console development @@ -81,10 +94,15 @@ class CoordinatorJettyServerInitializer implements JettyServerInitializer // Can't use '/*' here because of Guice and Jetty static content conflicts root.addFilter(GuiceFilter.class, "/info/*", null); root.addFilter(GuiceFilter.class, "/druid/coordinator/*", null); + if (beOverlord) { + root.addFilter(GuiceFilter.class, "/druid/indexer/*", null); + } // this will be removed in the next major release root.addFilter(GuiceFilter.class, "/coordinator/*", null); - root.addServlet(new ServletHolder(injector.getInstance(OverlordProxyServlet.class)), "/druid/indexer/*"); + if (!beOverlord) { + root.addServlet(new ServletHolder(injector.getInstance(OverlordProxyServlet.class)), "/druid/indexer/*"); + } HandlerList handlerList = new HandlerList(); handlerList.setHandlers(new Handler[]{JettyServerInitUtils.getJettyRequestLogHandler(), root}); diff --git a/services/src/main/java/io/druid/cli/CoordinatorOverlordRedirectInfo.java b/services/src/main/java/io/druid/cli/CoordinatorOverlordRedirectInfo.java new file mode 100644 index 00000000000..755dce45f39 --- /dev/null +++ b/services/src/main/java/io/druid/cli/CoordinatorOverlordRedirectInfo.java @@ -0,0 +1,65 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.cli; + +import com.google.inject.Inject; +import io.druid.indexing.overlord.TaskMaster; +import io.druid.indexing.overlord.http.OverlordRedirectInfo; +import io.druid.server.coordinator.DruidCoordinator; +import io.druid.server.http.CoordinatorRedirectInfo; +import io.druid.server.http.RedirectInfo; + +import java.net.URL; + +/** + */ +public class CoordinatorOverlordRedirectInfo implements RedirectInfo +{ + private final OverlordRedirectInfo overlordRedirectInfo; + private final CoordinatorRedirectInfo coordinatorRedirectInfo; + + @Inject + public CoordinatorOverlordRedirectInfo(TaskMaster taskMaster, DruidCoordinator druidCoordinator) + { + this.overlordRedirectInfo = new OverlordRedirectInfo(taskMaster); + this.coordinatorRedirectInfo = new CoordinatorRedirectInfo(druidCoordinator); + } + + @Override + public boolean doLocal(String requestURI) + { + return isOverlordRequest(requestURI) ? + overlordRedirectInfo.doLocal(requestURI) : + coordinatorRedirectInfo.doLocal(requestURI); + } + + @Override + public URL getRedirectURL(String queryString, String requestURI) + { + return isOverlordRequest(requestURI) ? + overlordRedirectInfo.getRedirectURL(queryString, requestURI) : + coordinatorRedirectInfo.getRedirectURL(queryString, requestURI); + } + + private boolean isOverlordRequest(String requestURI) + { + return requestURI.startsWith("/druid/indexer"); + } +}