From 1ff04412a2e73496e31969e15b6254663d65bce8 Mon Sep 17 00:00:00 2001 From: fjy Date: Mon, 23 Sep 2013 16:26:05 -0700 Subject: [PATCH 01/10] clean up realtime module and fix breakage in broker paths --- .../segment/realtime/DbSegmentPublisher.java | 2 + .../src/main/java/io/druid/cli/CliBroker.java | 37 ++++++++++++++++++- .../java/io/druid/guice/RealtimeModule.java | 23 ++++++++++-- 3 files changed, 57 insertions(+), 5 deletions(-) diff --git a/realtime/src/main/java/io/druid/segment/realtime/DbSegmentPublisher.java b/realtime/src/main/java/io/druid/segment/realtime/DbSegmentPublisher.java index 906eb5066a7..8ef9dfd193d 100644 --- a/realtime/src/main/java/io/druid/segment/realtime/DbSegmentPublisher.java +++ b/realtime/src/main/java/io/druid/segment/realtime/DbSegmentPublisher.java @@ -20,6 +20,7 @@ package io.druid.segment.realtime; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.inject.Inject; import com.metamx.common.logger.Logger; import io.druid.db.DbTablesConfig; import io.druid.timeline.DataSegment; @@ -40,6 +41,7 @@ public class DbSegmentPublisher implements SegmentPublisher private final DbTablesConfig config; private final IDBI dbi; + @Inject public DbSegmentPublisher( ObjectMapper jsonMapper, DbTablesConfig config, diff --git a/services/src/main/java/io/druid/cli/CliBroker.java b/services/src/main/java/io/druid/cli/CliBroker.java index 9ccf354be75..9dc9e985960 100644 --- a/services/src/main/java/io/druid/cli/CliBroker.java +++ b/services/src/main/java/io/druid/cli/CliBroker.java @@ -21,7 +21,9 @@ package io.druid.cli; import com.google.common.collect.ImmutableList; import com.google.inject.Binder; +import com.google.inject.Injector; import com.google.inject.Module; +import com.google.inject.servlet.GuiceFilter; import com.metamx.common.logger.Logger; import io.airlift.command.Command; import io.druid.client.BrokerServerView; @@ -42,9 +44,18 @@ import io.druid.query.QuerySegmentWalker; import io.druid.query.QueryToolChestWarehouse; import io.druid.server.ClientInfoResource; import io.druid.server.ClientQuerySegmentWalker; +import io.druid.server.QueryServlet; +import io.druid.server.StatusResource; import io.druid.server.initialization.JettyServerInitializer; import io.druid.server.metrics.MetricsModule; +import org.eclipse.jetty.server.Handler; import org.eclipse.jetty.server.Server; +import org.eclipse.jetty.server.handler.DefaultHandler; +import org.eclipse.jetty.server.handler.HandlerList; +import org.eclipse.jetty.servlet.DefaultServlet; +import org.eclipse.jetty.servlet.ServletContextHandler; +import org.eclipse.jetty.servlet.ServletHolder; +import org.eclipse.jetty.servlets.GzipFilter; import java.util.List; @@ -81,7 +92,7 @@ public class CliBroker extends ServerRunnable JsonConfigProvider.bind(binder, "druid.broker.cache", CacheProvider.class); binder.bind(QuerySegmentWalker.class).to(ClientQuerySegmentWalker.class).in(LazySingleton.class); - binder.bind(JettyServerInitializer.class).to(QueryJettyServerInitializer.class).in(LazySingleton.class); + binder.bind(JettyServerInitializer.class).to(BrokerJettyServerInitializer.class).in(LazySingleton.class); Jerseys.addResource(binder, ClientInfoResource.class); DiscoveryModule.register(binder, Self.class); @@ -92,4 +103,28 @@ public class CliBroker extends ServerRunnable } ); } + + private static class BrokerJettyServerInitializer implements JettyServerInitializer + { + @Override + public void initialize(Server server, Injector injector) + { + final ServletContextHandler resources = new ServletContextHandler(ServletContextHandler.SESSIONS); + resources.addServlet(new ServletHolder(new DefaultServlet()), "/druid/v2/datasources/*"); + resources.addFilter(GuiceFilter.class, "/druid/v2/datasources/*", null); + + final ServletContextHandler queries = new ServletContextHandler(ServletContextHandler.SESSIONS); + queries.setResourceBase("/"); + queries.addServlet(new ServletHolder(injector.getInstance(QueryServlet.class)), "/druid/v2/*"); + + final ServletContextHandler root = new ServletContextHandler(ServletContextHandler.SESSIONS); + root.addServlet(new ServletHolder(new DefaultServlet()), "/*"); + root.addFilter(GzipFilter.class, "/*", null); + root.addFilter(GuiceFilter.class, "/*", null); + + final HandlerList handlerList = new HandlerList(); + handlerList.setHandlers(new Handler[]{resources, queries, root, new DefaultHandler()}); + server.setHandler(handlerList); + } + } } diff --git a/services/src/main/java/io/druid/guice/RealtimeModule.java b/services/src/main/java/io/druid/guice/RealtimeModule.java index 98bce2e255e..13c182d352e 100644 --- a/services/src/main/java/io/druid/guice/RealtimeModule.java +++ b/services/src/main/java/io/druid/guice/RealtimeModule.java @@ -23,11 +23,15 @@ import com.fasterxml.jackson.databind.Module; import com.fasterxml.jackson.databind.jsontype.NamedType; import com.fasterxml.jackson.databind.module.SimpleModule; import com.google.inject.Binder; +import com.google.inject.Key; import com.google.inject.TypeLiteral; +import com.google.inject.multibindings.MapBinder; import io.druid.cli.QueryJettyServerInitializer; import io.druid.initialization.DruidModule; import io.druid.query.QuerySegmentWalker; +import io.druid.segment.realtime.DbSegmentPublisher; import io.druid.segment.realtime.FireDepartment; +import io.druid.segment.realtime.NoopSegmentPublisher; import io.druid.segment.realtime.RealtimeManager; import io.druid.segment.realtime.SegmentPublisher; import io.druid.segment.realtime.firehose.KafkaFirehoseFactory; @@ -38,17 +42,28 @@ import java.util.Arrays; import java.util.List; /** -*/ + */ public class RealtimeModule implements DruidModule { @Override public void configure(Binder binder) { - JsonConfigProvider.bind(binder, "druid.publish", SegmentPublisherProvider.class); - binder.bind(SegmentPublisher.class).toProvider(SegmentPublisherProvider.class); + PolyBind.createChoice( + binder, + "druid.publish.type", + Key.get(SegmentPublisher.class), + Key.get(NoopSegmentPublisher.class) + ); + final MapBinder publisherBinder = PolyBind.optionBinder(binder, Key.get(SegmentPublisher.class)); + publisherBinder.addBinding("db").to(DbSegmentPublisher.class); + binder.bind(DbSegmentPublisher.class).in(LazySingleton.class); JsonConfigProvider.bind(binder, "druid.realtime", RealtimeManagerConfig.class); - binder.bind(new TypeLiteral>(){}) + binder.bind( + new TypeLiteral>() + { + } + ) .toProvider(FireDepartmentsProvider.class) .in(LazySingleton.class); From 15843c39780a8c06f40216777a0de4e7a024dd3c Mon Sep 17 00:00:00 2001 From: fjy Date: Tue, 24 Sep 2013 10:36:26 -0700 Subject: [PATCH 02/10] refactor how server service discovery is done --- .../actions/RemoteTaskActionClient.java | 10 +-- .../RemoteTaskActionClientFactory.java | 11 +-- .../indexing/IndexingServiceClient.java | 10 +-- .../curator/discovery/DiscoveryModule.java | 8 +++ .../discovery/ServerDiscoveryFactory.java | 72 +++++++++++++++++++ .../discovery/ServerDiscoverySelector.java} | 23 +++--- .../guice/IndexingServiceDiscoveryModule.java | 47 ++---------- 7 files changed, 115 insertions(+), 66 deletions(-) create mode 100644 server/src/main/java/io/druid/curator/discovery/ServerDiscoveryFactory.java rename server/src/main/java/io/druid/{client/indexing/IndexingServiceSelector.java => curator/discovery/ServerDiscoverySelector.java} (84%) diff --git a/indexing-service/src/main/java/io/druid/indexing/common/actions/RemoteTaskActionClient.java b/indexing-service/src/main/java/io/druid/indexing/common/actions/RemoteTaskActionClient.java index b2613e13064..41639aa9989 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/actions/RemoteTaskActionClient.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/actions/RemoteTaskActionClient.java @@ -27,8 +27,8 @@ import com.metamx.common.ISE; import com.metamx.common.logger.Logger; import com.metamx.http.client.HttpClient; import com.metamx.http.client.response.ToStringResponseHandler; -import io.druid.client.indexing.IndexingServiceSelector; import io.druid.client.selector.Server; +import io.druid.curator.discovery.ServerDiscoverySelector; import io.druid.indexing.common.RetryPolicy; import io.druid.indexing.common.RetryPolicyFactory; import io.druid.indexing.common.task.Task; @@ -42,7 +42,7 @@ public class RemoteTaskActionClient implements TaskActionClient { private final Task task; private final HttpClient httpClient; - private final IndexingServiceSelector serviceProvider; + private final ServerDiscoverySelector selector; private final RetryPolicyFactory retryPolicyFactory; private final ObjectMapper jsonMapper; @@ -51,14 +51,14 @@ public class RemoteTaskActionClient implements TaskActionClient public RemoteTaskActionClient( Task task, HttpClient httpClient, - IndexingServiceSelector serviceProvider, + ServerDiscoverySelector selector, RetryPolicyFactory retryPolicyFactory, ObjectMapper jsonMapper ) { this.task = task; this.httpClient = httpClient; - this.serviceProvider = serviceProvider; + this.selector = selector; this.retryPolicyFactory = retryPolicyFactory; this.jsonMapper = jsonMapper; } @@ -127,7 +127,7 @@ public class RemoteTaskActionClient implements TaskActionClient private URI getServiceUri() throws Exception { - final Server instance = serviceProvider.pick(); + final Server instance = selector.pick(); if (instance == null) { throw new ISE("Cannot find instance of indexer to talk to!"); } diff --git a/indexing-service/src/main/java/io/druid/indexing/common/actions/RemoteTaskActionClientFactory.java b/indexing-service/src/main/java/io/druid/indexing/common/actions/RemoteTaskActionClientFactory.java index 4ae53a595b9..033c1263ca3 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/actions/RemoteTaskActionClientFactory.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/actions/RemoteTaskActionClientFactory.java @@ -22,7 +22,8 @@ package io.druid.indexing.common.actions; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.inject.Inject; import com.metamx.http.client.HttpClient; -import io.druid.client.indexing.IndexingServiceSelector; +import io.druid.client.indexing.IndexingService; +import io.druid.curator.discovery.ServerDiscoverySelector; import io.druid.guice.annotations.Global; import io.druid.indexing.common.RetryPolicyFactory; import io.druid.indexing.common.task.Task; @@ -32,20 +33,20 @@ import io.druid.indexing.common.task.Task; public class RemoteTaskActionClientFactory implements TaskActionClientFactory { private final HttpClient httpClient; - private final IndexingServiceSelector serviceProvider; + private final ServerDiscoverySelector selector; private final RetryPolicyFactory retryPolicyFactory; private final ObjectMapper jsonMapper; @Inject public RemoteTaskActionClientFactory( @Global HttpClient httpClient, - IndexingServiceSelector serviceProvider, + @IndexingService ServerDiscoverySelector selector, RetryPolicyFactory retryPolicyFactory, ObjectMapper jsonMapper ) { this.httpClient = httpClient; - this.serviceProvider = serviceProvider; + this.selector = selector; this.retryPolicyFactory = retryPolicyFactory; this.jsonMapper = jsonMapper; } @@ -53,6 +54,6 @@ public class RemoteTaskActionClientFactory implements TaskActionClientFactory @Override public TaskActionClient create(Task task) { - return new RemoteTaskActionClient(task, httpClient, serviceProvider, retryPolicyFactory, jsonMapper); + return new RemoteTaskActionClient(task, httpClient, selector, retryPolicyFactory, jsonMapper); } } diff --git a/server/src/main/java/io/druid/client/indexing/IndexingServiceClient.java b/server/src/main/java/io/druid/client/indexing/IndexingServiceClient.java index 628ed978c42..abae59ee168 100644 --- a/server/src/main/java/io/druid/client/indexing/IndexingServiceClient.java +++ b/server/src/main/java/io/druid/client/indexing/IndexingServiceClient.java @@ -26,8 +26,8 @@ import com.metamx.common.IAE; import com.metamx.common.ISE; import com.metamx.http.client.HttpClient; import com.metamx.http.client.response.InputStreamResponseHandler; -import io.druid.client.selector.DiscoverySelector; import io.druid.client.selector.Server; +import io.druid.curator.discovery.ServerDiscoverySelector; import io.druid.guice.annotations.Global; import io.druid.timeline.DataSegment; import org.joda.time.Interval; @@ -43,18 +43,18 @@ public class IndexingServiceClient private final HttpClient client; private final ObjectMapper jsonMapper; - private final DiscoverySelector serviceProvider; + private final ServerDiscoverySelector selector; @Inject public IndexingServiceClient( @Global HttpClient client, ObjectMapper jsonMapper, - @IndexingService DiscoverySelector serviceProvider + @IndexingService ServerDiscoverySelector selector ) { this.client = client; this.jsonMapper = jsonMapper; - this.serviceProvider = serviceProvider; + this.selector = selector; } public void mergeSegments(List segments) @@ -106,7 +106,7 @@ public class IndexingServiceClient private String baseUrl() { try { - final Server instance = serviceProvider.pick(); + final Server instance = selector.pick(); if (instance == null) { throw new ISE("Cannot find instance of indexingService"); } diff --git a/server/src/main/java/io/druid/curator/discovery/DiscoveryModule.java b/server/src/main/java/io/druid/curator/discovery/DiscoveryModule.java index 3f577f2d64f..88fcdb3c299 100644 --- a/server/src/main/java/io/druid/curator/discovery/DiscoveryModule.java +++ b/server/src/main/java/io/druid/curator/discovery/DiscoveryModule.java @@ -224,6 +224,14 @@ public class DiscoveryModule implements Module return serviceDiscovery; } + @Provides @LazySingleton + public ServerDiscoveryFactory getServerDiscoveryFactory( + ServiceDiscovery serviceDiscovery + ) + { + return new ServerDiscoveryFactory(serviceDiscovery); + } + private static class NoopServiceDiscovery implements ServiceDiscovery { @Override diff --git a/server/src/main/java/io/druid/curator/discovery/ServerDiscoveryFactory.java b/server/src/main/java/io/druid/curator/discovery/ServerDiscoveryFactory.java new file mode 100644 index 00000000000..c436289e70e --- /dev/null +++ b/server/src/main/java/io/druid/curator/discovery/ServerDiscoveryFactory.java @@ -0,0 +1,72 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012, 2013 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package io.druid.curator.discovery; + +import com.google.inject.Inject; +import org.apache.curator.x.discovery.ServiceDiscovery; +import org.apache.curator.x.discovery.ServiceInstance; +import org.apache.curator.x.discovery.ServiceProvider; + +import java.io.IOException; + +/** + */ +public class ServerDiscoveryFactory +{ + private final ServiceDiscovery serviceDiscovery; + + @Inject + public ServerDiscoveryFactory(ServiceDiscovery serviceDiscovery) + { + this.serviceDiscovery = serviceDiscovery; + } + + public ServerDiscoverySelector createSelector(String serviceName) + { + if (serviceName == null) { + return new ServerDiscoverySelector(new NoopServiceProvider()); + } + + final ServiceProvider serviceProvider = serviceDiscovery.serviceProviderBuilder().serviceName(serviceName).build(); + return new ServerDiscoverySelector(serviceProvider); + } + + private static class NoopServiceProvider implements ServiceProvider + { + @Override + public void start() throws Exception + { + // do nothing + } + + @Override + public ServiceInstance getInstance() throws Exception + { + return null; + } + + @Override + public void close() throws IOException + { + // do nothing + } + } + +} diff --git a/server/src/main/java/io/druid/client/indexing/IndexingServiceSelector.java b/server/src/main/java/io/druid/curator/discovery/ServerDiscoverySelector.java similarity index 84% rename from server/src/main/java/io/druid/client/indexing/IndexingServiceSelector.java rename to server/src/main/java/io/druid/curator/discovery/ServerDiscoverySelector.java index 7c5eaaf8d72..b7cd72abf17 100644 --- a/server/src/main/java/io/druid/client/indexing/IndexingServiceSelector.java +++ b/server/src/main/java/io/druid/curator/discovery/ServerDiscoverySelector.java @@ -17,9 +17,8 @@ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ -package io.druid.client.indexing; +package io.druid.curator.discovery; -import com.google.inject.Inject; import com.metamx.common.lifecycle.LifecycleStart; import com.metamx.common.lifecycle.LifecycleStop; import com.metamx.common.logger.Logger; @@ -28,21 +27,18 @@ import io.druid.client.selector.Server; import org.apache.curator.x.discovery.ServiceInstance; import org.apache.curator.x.discovery.ServiceProvider; -import javax.annotation.Nullable; import java.io.IOException; /** -*/ -public class IndexingServiceSelector implements DiscoverySelector + */ +public class ServerDiscoverySelector implements DiscoverySelector { - private static final Logger log = new Logger(IndexingServiceSelector.class); + private static final Logger log = new Logger(ServerDiscoverySelector.class); private final ServiceProvider serviceProvider; - @Inject - public IndexingServiceSelector( - @Nullable @IndexingService ServiceProvider serviceProvider - ) { + public ServerDiscoverySelector(ServiceProvider serviceProvider) + { this.serviceProvider = serviceProvider; } @@ -54,7 +50,12 @@ public class IndexingServiceSelector implements DiscoverySelector instance = serviceProvider.getInstance(); } catch (Exception e) { - log.info(e, ""); + log.info(e, "Exception getting instance"); + return null; + } + + if (instance == null) { + log.error("No server instance found"); return null; } diff --git a/server/src/main/java/io/druid/guice/IndexingServiceDiscoveryModule.java b/server/src/main/java/io/druid/guice/IndexingServiceDiscoveryModule.java index 60e0c15281b..10535bb9d87 100644 --- a/server/src/main/java/io/druid/guice/IndexingServiceDiscoveryModule.java +++ b/server/src/main/java/io/druid/guice/IndexingServiceDiscoveryModule.java @@ -22,17 +22,10 @@ package io.druid.guice; import com.google.inject.Binder; import com.google.inject.Module; import com.google.inject.Provides; -import com.google.inject.TypeLiteral; import io.druid.client.indexing.IndexingService; -import io.druid.client.indexing.IndexingServiceSelector; import io.druid.client.indexing.IndexingServiceSelectorConfig; -import io.druid.client.selector.DiscoverySelector; -import io.druid.client.selector.Server; -import org.apache.curator.x.discovery.ServiceDiscovery; -import org.apache.curator.x.discovery.ServiceInstance; -import org.apache.curator.x.discovery.ServiceProvider; - -import java.io.IOException; +import io.druid.curator.discovery.ServerDiscoveryFactory; +import io.druid.curator.discovery.ServerDiscoverySelector; /** */ @@ -42,42 +35,16 @@ public class IndexingServiceDiscoveryModule implements Module public void configure(Binder binder) { JsonConfigProvider.bind(binder, "druid.selectors.indexing", IndexingServiceSelectorConfig.class); - binder.bind(new TypeLiteral>(){}) - .annotatedWith(IndexingService.class) - .to(IndexingServiceSelector.class); - - binder.bind(IndexingServiceSelector.class).in(ManageLifecycle.class); } @Provides - @LazySingleton @IndexingService - public ServiceProvider getServiceProvider( + @IndexingService + @ManageLifecycle + public ServerDiscoverySelector getServiceProvider( IndexingServiceSelectorConfig config, - ServiceDiscovery serviceDiscovery + ServerDiscoveryFactory serverDiscoveryFactory ) { - if (config.getServiceName() == null) { - return new ServiceProvider() - { - @Override - public void start() throws Exception - { - - } - - @Override - public ServiceInstance getInstance() throws Exception - { - return null; - } - - @Override - public void close() throws IOException - { - - } - }; - } - return serviceDiscovery.serviceProviderBuilder().serviceName(config.getServiceName()).build(); + return serverDiscoveryFactory.createSelector(config.getServiceName()); } } From be5bb7f2eb13f709464acfd8f04eca4fe2628965 Mon Sep 17 00:00:00 2001 From: fjy Date: Tue, 24 Sep 2013 14:20:56 -0700 Subject: [PATCH 03/10] fix lifecycle startup/stop ordering problem with discovery module and make druid able to load local extensions --- .../curator/discovery/DiscoveryModule.java | 47 ++++++++++++++----- .../initialization/ExtensionsConfig.java | 9 ++++ .../java/io/druid/cli/Initialization.java | 30 ++++++------ 3 files changed, 59 insertions(+), 27 deletions(-) diff --git a/server/src/main/java/io/druid/curator/discovery/DiscoveryModule.java b/server/src/main/java/io/druid/curator/discovery/DiscoveryModule.java index 88fcdb3c299..ca5340d9565 100644 --- a/server/src/main/java/io/druid/curator/discovery/DiscoveryModule.java +++ b/server/src/main/java/io/druid/curator/discovery/DiscoveryModule.java @@ -59,7 +59,7 @@ import java.util.concurrent.ThreadFactory; /** * The DiscoveryModule allows for the registration of Keys of DruidNode objects, which it intends to be * automatically announced at the end of the lifecycle start. - * + *

* In order for this to work a ServiceAnnouncer instance *must* be injected and instantiated first. * This can often be achieved by registering ServiceAnnouncer.class with the LifecycleModule. */ @@ -69,19 +69,25 @@ public class DiscoveryModule implements Module /** * Requests that the un-annotated DruidNode instance be injected and published as part of the lifecycle. - * + *

* That is, this module will announce the DruidNode instance returned by * injector.getInstance(Key.get(DruidNode.class)) automatically. * Announcement will happen in the LAST stage of the Lifecycle */ public static void registerDefault(Binder binder) { - registerKey(binder, Key.get(new TypeLiteral(){})); + registerKey( + binder, Key.get( + new TypeLiteral() + { + } + ) + ); } /** * Requests that the annotated DruidNode instance be injected and published as part of the lifecycle. - * + *

* That is, this module will announce the DruidNode instance returned by * injector.getInstance(Key.get(DruidNode.class, annotation)) automatically. * Announcement will happen in the LAST stage of the Lifecycle @@ -90,12 +96,18 @@ public class DiscoveryModule implements Module */ public static void register(Binder binder, Annotation annotation) { - registerKey(binder, Key.get(new TypeLiteral(){}, annotation)); + registerKey( + binder, Key.get( + new TypeLiteral() + { + }, annotation + ) + ); } /** * Requests that the annotated DruidNode instance be injected and published as part of the lifecycle. - * + *

* That is, this module will announce the DruidNode instance returned by * injector.getInstance(Key.get(DruidNode.class, annotation)) automatically. * Announcement will happen in the LAST stage of the Lifecycle @@ -104,12 +116,18 @@ public class DiscoveryModule implements Module */ public static void register(Binder binder, Class annotation) { - registerKey(binder, Key.get(new TypeLiteral(){}, annotation)); + registerKey( + binder, Key.get( + new TypeLiteral() + { + }, annotation + ) + ); } /** * Requests that the keyed DruidNode instance be injected and published as part of the lifecycle. - * + *

* That is, this module will announce the DruidNode instance returned by * injector.getInstance(Key.get(DruidNode.class, annotation)) automatically. * Announcement will happen in the LAST stage of the Lifecycle @@ -137,7 +155,9 @@ public class DiscoveryModule implements Module .asEagerSingleton(); } - @Provides @LazySingleton @Named(NAME) + @Provides + @LazySingleton + @Named(NAME) public CuratorServiceAnnouncer getServiceAnnouncer( final CuratorServiceAnnouncer announcer, final Injector injector, @@ -181,7 +201,8 @@ public class DiscoveryModule implements Module return announcer; } - @Provides @LazySingleton + @Provides + @LazySingleton public ServiceDiscovery getServiceDiscovery( CuratorFramework curator, CuratorDiscoveryConfig config, @@ -217,14 +238,14 @@ public class DiscoveryModule implements Module throw Throwables.propagate(e); } } - }, - Lifecycle.Stage.LAST + } ); return serviceDiscovery; } - @Provides @LazySingleton + @Provides + @LazySingleton public ServerDiscoveryFactory getServerDiscoveryFactory( ServiceDiscovery serviceDiscovery ) diff --git a/server/src/main/java/io/druid/server/initialization/ExtensionsConfig.java b/server/src/main/java/io/druid/server/initialization/ExtensionsConfig.java index 0973a249434..46403358758 100644 --- a/server/src/main/java/io/druid/server/initialization/ExtensionsConfig.java +++ b/server/src/main/java/io/druid/server/initialization/ExtensionsConfig.java @@ -29,6 +29,10 @@ import java.util.List; */ public class ExtensionsConfig { + @JsonProperty + @NotNull + private boolean searchCurrentClassloader = true; + @JsonProperty @NotNull private List coordinates = ImmutableList.of(); @@ -44,6 +48,11 @@ public class ExtensionsConfig "https://metamx.artifactoryonline.com/metamx/pub-libs-releases-local" ); + public boolean searchCurrentClassloader() + { + return searchCurrentClassloader; + } + public List getCoordinates() { return coordinates; diff --git a/services/src/main/java/io/druid/cli/Initialization.java b/services/src/main/java/io/druid/cli/Initialization.java index 8b1f1fa7640..f983762ccb0 100644 --- a/services/src/main/java/io/druid/cli/Initialization.java +++ b/services/src/main/java/io/druid/cli/Initialization.java @@ -139,14 +139,13 @@ public class for (Artifact artifact : artifacts) { if (!exclusions.contains(artifact.getGroupId())) { urls.add(artifact.getFile().toURI().toURL()); - } - else { + } else { log.error("Skipped Artifact[%s]", artifact); } } for (URL url : urls) { - log.error("Added URL[%s]", url); + log.info("Added URL[%s]", url); } loader = new URLClassLoader(urls.toArray(new URL[urls.size()]), Initialization.class.getClassLoader()); @@ -165,6 +164,13 @@ public class } } + if (config.searchCurrentClassloader()) { + for (T module : ServiceLoader.load(clazz, Initialization.class.getClassLoader())) { + log.info("Adding local module[%s]", module.getClass()); + retVal.add(module); + } + } + return retVal; } @@ -243,7 +249,8 @@ public class private final ObjectMapper smileMapper; private final List modules; - public ModuleList(Injector baseInjector) { + public ModuleList(Injector baseInjector) + { this.baseInjector = baseInjector; this.jsonMapper = baseInjector.getInstance(Key.get(ObjectMapper.class, Json.class)); this.smileMapper = baseInjector.getInstance(Key.get(ObjectMapper.class, Smile.class)); @@ -260,24 +267,19 @@ public class if (input instanceof DruidModule) { baseInjector.injectMembers(input); modules.add(registerJacksonModules(((DruidModule) input))); - } - else if (input instanceof Module) { + } else if (input instanceof Module) { baseInjector.injectMembers(input); modules.add((Module) input); - } - else if (input instanceof Class) { + } else if (input instanceof Class) { if (DruidModule.class.isAssignableFrom((Class) input)) { modules.add(registerJacksonModules(baseInjector.getInstance((Class) input))); - } - else if (Module.class.isAssignableFrom((Class) input)) { + } else if (Module.class.isAssignableFrom((Class) input)) { modules.add(baseInjector.getInstance((Class) input)); return; - } - else { + } else { throw new ISE("Class[%s] does not implement %s", input.getClass(), Module.class); } - } - else { + } else { throw new ISE("Unknown module type[%s]", input.getClass()); } } From 19276f6badab8fe818c77c4205ffe22cb129e582 Mon Sep 17 00:00:00 2001 From: fjy Date: Tue, 24 Sep 2013 14:34:09 -0700 Subject: [PATCH 04/10] fix spacing issues and other code review comments --- .../curator/discovery/DiscoveryModule.java | 24 +++---------------- .../java/io/druid/cli/Initialization.java | 14 +++++------ 2 files changed, 10 insertions(+), 28 deletions(-) diff --git a/server/src/main/java/io/druid/curator/discovery/DiscoveryModule.java b/server/src/main/java/io/druid/curator/discovery/DiscoveryModule.java index ca5340d9565..a632392fbac 100644 --- a/server/src/main/java/io/druid/curator/discovery/DiscoveryModule.java +++ b/server/src/main/java/io/druid/curator/discovery/DiscoveryModule.java @@ -76,13 +76,7 @@ public class DiscoveryModule implements Module */ public static void registerDefault(Binder binder) { - registerKey( - binder, Key.get( - new TypeLiteral() - { - } - ) - ); + registerKey(binder, Key.get(new TypeLiteral(){})); } /** @@ -96,13 +90,7 @@ public class DiscoveryModule implements Module */ public static void register(Binder binder, Annotation annotation) { - registerKey( - binder, Key.get( - new TypeLiteral() - { - }, annotation - ) - ); + registerKey(binder, Key.get(new TypeLiteral(){}, annotation)); } /** @@ -116,13 +104,7 @@ public class DiscoveryModule implements Module */ public static void register(Binder binder, Class annotation) { - registerKey( - binder, Key.get( - new TypeLiteral() - { - }, annotation - ) - ); + registerKey(binder, Key.get(new TypeLiteral(){}, annotation)); } /** diff --git a/services/src/main/java/io/druid/cli/Initialization.java b/services/src/main/java/io/druid/cli/Initialization.java index f983762ccb0..76f502b1563 100644 --- a/services/src/main/java/io/druid/cli/Initialization.java +++ b/services/src/main/java/io/druid/cli/Initialization.java @@ -97,6 +97,13 @@ public class final TeslaAether aether = getAetherClient(config); List retVal = Lists.newArrayList(); + if (config.searchCurrentClassloader()) { + for (T module : ServiceLoader.load(clazz, Initialization.class.getClassLoader())) { + log.info("Adding local module[%s]", module.getClass()); + retVal.add(module); + } + } + for (String coordinate : config.getCoordinates()) { log.info("Loading extension[%s]", coordinate); try { @@ -164,13 +171,6 @@ public class } } - if (config.searchCurrentClassloader()) { - for (T module : ServiceLoader.load(clazz, Initialization.class.getClassLoader())) { - log.info("Adding local module[%s]", module.getClass()); - retVal.add(module); - } - } - return retVal; } From 45e22d98f1fb4d95ac3f46ad3c9daeb3ed576700 Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Wed, 25 Sep 2013 07:44:17 -0700 Subject: [PATCH 05/10] Fix S3DataSegmentPuller retry bug --- .../io/druid/segment/loading/S3DataSegmentPuller.java | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/io/druid/segment/loading/S3DataSegmentPuller.java b/server/src/main/java/io/druid/segment/loading/S3DataSegmentPuller.java index 49b7501ad51..abf4b3502cf 100644 --- a/server/src/main/java/io/druid/segment/loading/S3DataSegmentPuller.java +++ b/server/src/main/java/io/druid/segment/loading/S3DataSegmentPuller.java @@ -110,7 +110,6 @@ public class S3DataSegmentPuller implements DataSegmentPuller return null; } catch (IOException e) { - FileUtils.deleteDirectory(outDir); throw new IOException(String.format("Problem decompressing object[%s]", s3Obj), e); } finally { @@ -125,6 +124,16 @@ public class S3DataSegmentPuller implements DataSegmentPuller ); } catch (Exception e) { + try { + FileUtils.deleteDirectory(outDir); + } catch (IOException ioe) { + log.warn( + ioe, + "Failed to remove output directory for segment[%s] after exception: %s", + segment.getIdentifier(), + outDir + ); + } throw new SegmentLoadingException(e, e.getMessage()); } } From 87259321b610aeec50219207012a401a8b29706a Mon Sep 17 00:00:00 2001 From: fjy Date: Thu, 26 Sep 2013 11:04:42 -0700 Subject: [PATCH 06/10] port hadoop druid indexer to new guice framework --- .../java/io/druid/guice/ConfigProvider.java | 17 +- .../io/druid/indexer/HadoopDruidIndexer.java | 130 --------------- .../indexer/HadoopDruidIndexerConfig.java | 66 ++++---- .../druid/indexer/HadoopDruidIndexerJob.java | 2 + .../indexing/common/task/TaskSerdeTest.java | 4 +- .../curator/discovery/DiscoveryModule.java | 6 +- .../cli/BrokerJettyServerInitializer.java | 59 +++++++ .../src/main/java/io/druid/cli/CliBroker.java | 35 ---- .../java/io/druid/cli/CliHadoopIndexer.java | 157 ++++++++++++++++++ .../main/java/io/druid/cli/GuiceRunnable.java | 76 +++++++++ services/src/main/java/io/druid/cli/Main.java | 5 + .../java/io/druid/cli/ServerRunnable.java | 38 +---- 12 files changed, 352 insertions(+), 243 deletions(-) delete mode 100644 indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexer.java create mode 100644 services/src/main/java/io/druid/cli/BrokerJettyServerInitializer.java create mode 100644 services/src/main/java/io/druid/cli/CliHadoopIndexer.java create mode 100644 services/src/main/java/io/druid/cli/GuiceRunnable.java diff --git a/common/src/main/java/io/druid/guice/ConfigProvider.java b/common/src/main/java/io/druid/guice/ConfigProvider.java index 47335ac1c24..ab33c580ff8 100644 --- a/common/src/main/java/io/druid/guice/ConfigProvider.java +++ b/common/src/main/java/io/druid/guice/ConfigProvider.java @@ -57,7 +57,7 @@ public class ConfigProvider implements Provider private final Class clazz; private final Map replacements; - private T object = null; + private ConfigurationObjectFactory factory = null; public ConfigProvider( Class clazz, @@ -70,20 +70,21 @@ public class ConfigProvider implements Provider @Inject public void inject(ConfigurationObjectFactory factory) + { + this.factory = factory; + } + + @Override + public T get() { try { // ConfigMagic handles a null replacements - object = factory.buildWithReplacements(clazz, replacements); + Preconditions.checkNotNull(factory, "WTF!? Code misconfigured, inject() didn't get called."); + return factory.buildWithReplacements(clazz, replacements); } catch (IllegalArgumentException e) { log.info("Unable to build instance of class[%s]", clazz); throw e; } } - - @Override - public T get() - { - return Preconditions.checkNotNull(object, "WTF!? Code misconfigured, inject() didn't get called."); - } } diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexer.java b/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexer.java deleted file mode 100644 index 820c301a2ef..00000000000 --- a/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexer.java +++ /dev/null @@ -1,130 +0,0 @@ -/* - * Druid - a distributed column store. - * Copyright (C) 2012, 2013 Metamarkets Group Inc. - * - * This program is free software; you can redistribute it and/or - * modify it under the terms of the GNU General Public License - * as published by the Free Software Foundation; either version 2 - * of the License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program; if not, write to the Free Software - * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. - */ - -package io.druid.indexer; - -import com.google.common.collect.ImmutableList; -import com.metamx.common.Pair; -import com.metamx.common.lifecycle.Lifecycle; - -import java.util.List; - -/** - */ -@Deprecated -public class HadoopDruidIndexer -{ - public static void main(String[] args) throws Exception - { - if (args.length < 1 || args.length > 2) { - printHelp(); - System.exit(2); - } - - HadoopDruidIndexerNode node = HadoopDruidIndexerNode.builder().build(); - - if (args.length == 2) { - node.setIntervalSpec(args[0]); - } - node.setArgumentSpec(args[args.length == 1 ? 0 : 1]); - - Lifecycle lifecycle = new Lifecycle(); - lifecycle.addManagedInstance(node); - - try { - lifecycle.start(); - } - catch (Exception e) { - e.printStackTrace(); - Thread.sleep(500); - printHelp(); - System.exit(1); - } - } - - private static final List> expectedFields = - ImmutableList.>builder() - .add(Pair.of("dataSource", "Name of dataSource")) - .add(Pair.of("timestampColumn", "Column name of the timestamp column")) - .add(Pair.of("timestampFormat", "Format name of the timestamp column (posix or iso)")) - .add( - Pair.of( - "dataSpec", - "A JSON object with fields " - + - "format=(json, csv, tsv), " - + - "columns=JSON array of column names for the delimited text input file (only for csv and tsv formats)," - + - "dimensions=JSON array of dimensionn names (must match names in columns)," - + - "delimiter=delimiter of the data (only for tsv format)" - ) - ) - .add( - Pair.of( - "granularitySpec", - "A JSON object indicating the Granularity that segments should be created at." - ) - ) - .add( - Pair.of( - "pathSpec", - "A JSON object with fields type=granularity, inputPath, filePattern, dataGranularity" - ) - ) - .add( - Pair.of( - "rollupSpec", - "JSON object with fields rollupGranularity, aggs=JSON Array of Aggregator specs" - ) - ) - .add(Pair.of("workingPath", "Path to store intermediate output data. Deleted when finished.")) - .add(Pair.of("segmentOutputPath", "Path to store output segments.")) - .add( - Pair.of( - "updaterJobSpec", - "JSON object with fields type=db, connectURI of the database, username, password, and segment table name" - ) - ) - .add(Pair.of("cleanupOnFailure", "Clean up intermediate files on failure? (default: true)")) - .add(Pair.of("leaveIntermediate", "Leave intermediate files. (default: false)")) - .add(Pair.of("partitionDimension", "Dimension to partition by (optional)")) - .add( - Pair.of( - "targetPartitionSize", - "Integer representing the target number of rows in a partition (required if partitionDimension != null)" - ) - ) - .build(); - - private static void printHelp() - { - System.out.println("Usage: "); - System.out.println(" is either a JSON object or the path to a file that contains a JSON object."); - System.out.println(); - System.out.println("JSON object description:"); - System.out.println("{"); - for (Pair expectedField : expectedFields) { - System.out.printf(" \"%s\": %s%n", expectedField.lhs, expectedField.rhs); - } - System.out.println("}"); - } - -} diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerConfig.java b/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerConfig.java index 70a3bb04a30..6723f49dbae 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerConfig.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerConfig.java @@ -97,26 +97,26 @@ public class HadoopDruidIndexerConfig public static HadoopDruidIndexerConfig fromMap(Map argSpec) { - List registererers = Lists.transform( - MapUtils.getList(argSpec, "registererers", ImmutableList.of()), - new Function() - { - @Override - public Registererer apply(@Nullable Object input) - { - try { - return (Registererer) Class.forName((String) input).newInstance(); - } - catch (Exception e) { - throw Throwables.propagate(e); - } - } - } - ); + //List registererers = Lists.transform( + // MapUtils.getList(argSpec, "registererers", ImmutableList.of()), + // new Function() + // { + // @Override + // public Registererer apply(@Nullable Object input) + // { + // try { + // return (Registererer) Class.forName((String) input).newInstance(); + // } + // catch (Exception e) { + // throw Throwables.propagate(e); + // } + // } + // } + //); - if (!registererers.isEmpty()) { - Registererers.registerHandlers(registererers, Arrays.asList(jsonMapper)); - } + //if (!registererers.isEmpty()) { + // Registererers.registerHandlers(registererers, Arrays.asList(jsonMapper)); + //} return jsonMapper.convertValue(argSpec, HadoopDruidIndexerConfig.class); } @@ -179,7 +179,7 @@ public class HadoopDruidIndexerConfig private volatile DataRollupSpec rollupSpec; private volatile DbUpdaterJobSpec updaterJobSpec; private volatile boolean ignoreInvalidRows = false; - private volatile List registererers = Lists.newArrayList(); + //private volatile List registererers = Lists.newArrayList(); @JsonCreator public HadoopDruidIndexerConfig( @@ -203,8 +203,8 @@ public class HadoopDruidIndexerConfig final @JsonProperty("overwriteFiles") boolean overwriteFiles, final @JsonProperty("rollupSpec") DataRollupSpec rollupSpec, final @JsonProperty("updaterJobSpec") DbUpdaterJobSpec updaterJobSpec, - final @JsonProperty("ignoreInvalidRows") boolean ignoreInvalidRows, - final @JsonProperty("registererers") List registererers + final @JsonProperty("ignoreInvalidRows") boolean ignoreInvalidRows + //final @JsonProperty("registererers") List registererers ) { this.dataSource = dataSource; @@ -224,7 +224,7 @@ public class HadoopDruidIndexerConfig this.rollupSpec = rollupSpec; this.updaterJobSpec = updaterJobSpec; this.ignoreInvalidRows = ignoreInvalidRows; - this.registererers = registererers; + //this.registererers = registererers; if(partitionsSpec != null) { Preconditions.checkArgument( @@ -517,16 +517,16 @@ public class HadoopDruidIndexerConfig this.ignoreInvalidRows = ignoreInvalidRows; } - @JsonProperty - public List getRegistererers() - { - return registererers; - } - - public void setRegistererers(List registererers) - { - this.registererers = registererers; - } + //@JsonProperty + //public List getRegistererers() + //{ + // return registererers; + //} + // + //public void setRegistererers(List registererers) + //{ + // this.registererers = registererers; + //} /******************************************** Granularity/Bucket Helper Methods diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerJob.java b/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerJob.java index b0ac9078ddc..ba024125cb8 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerJob.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerJob.java @@ -22,6 +22,7 @@ package io.druid.indexer; import com.google.common.base.Throwables; import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import com.google.inject.Inject; import com.metamx.common.ISE; import com.metamx.common.logger.Logger; import io.druid.timeline.DataSegment; @@ -48,6 +49,7 @@ public class HadoopDruidIndexerJob implements Jobby private IndexGeneratorJob indexJob; private volatile List publishedSegments = null; + @Inject public HadoopDruidIndexerJob( HadoopDruidIndexerConfig config ) diff --git a/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java b/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java index eaceac3ed9e..a868f3d7fee 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java @@ -359,8 +359,8 @@ public class TaskSerdeTest false, new DataRollupSpec(ImmutableList.of(), QueryGranularity.NONE), null, - false, - ImmutableList.of() + false + //ImmutableList.of() ) ); diff --git a/server/src/main/java/io/druid/curator/discovery/DiscoveryModule.java b/server/src/main/java/io/druid/curator/discovery/DiscoveryModule.java index a632392fbac..f6caf327332 100644 --- a/server/src/main/java/io/druid/curator/discovery/DiscoveryModule.java +++ b/server/src/main/java/io/druid/curator/discovery/DiscoveryModule.java @@ -35,6 +35,8 @@ import io.druid.guice.DruidBinders; import io.druid.guice.JsonConfigProvider; import io.druid.guice.KeyHolder; import io.druid.guice.LazySingleton; +import io.druid.guice.LifecycleModule; +import io.druid.guice.annotations.Self; import io.druid.server.DruidNode; import io.druid.server.initialization.CuratorDiscoveryConfig; import org.apache.curator.framework.CuratorFramework; @@ -118,7 +120,7 @@ public class DiscoveryModule implements Module */ public static void registerKey(Binder binder, Key key) { - DruidBinders.discoveryAnnouncementBinder(binder).addBinding().toInstance(new KeyHolder<>(key)); + LifecycleModule.registerKey(binder, key); } @Override @@ -134,7 +136,7 @@ public class DiscoveryModule implements Module // We bind this eagerly so that it gets instantiated and registers stuff with Lifecycle as a side-effect binder.bind(ServiceAnnouncer.class) .to(Key.get(CuratorServiceAnnouncer.class, Names.named(NAME))) - .asEagerSingleton(); + .in(LazySingleton.class); } @Provides diff --git a/services/src/main/java/io/druid/cli/BrokerJettyServerInitializer.java b/services/src/main/java/io/druid/cli/BrokerJettyServerInitializer.java new file mode 100644 index 00000000000..08610018e19 --- /dev/null +++ b/services/src/main/java/io/druid/cli/BrokerJettyServerInitializer.java @@ -0,0 +1,59 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012, 2013 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package io.druid.cli; + +import com.google.inject.Injector; +import com.google.inject.servlet.GuiceFilter; +import io.druid.server.QueryServlet; +import io.druid.server.initialization.JettyServerInitializer; +import org.eclipse.jetty.server.Handler; +import org.eclipse.jetty.server.Server; +import org.eclipse.jetty.server.handler.DefaultHandler; +import org.eclipse.jetty.server.handler.HandlerList; +import org.eclipse.jetty.servlet.DefaultServlet; +import org.eclipse.jetty.servlet.ServletContextHandler; +import org.eclipse.jetty.servlet.ServletHolder; +import org.eclipse.jetty.servlets.GzipFilter; + +/** + */ +public class BrokerJettyServerInitializer implements JettyServerInitializer +{ + @Override + public void initialize(Server server, Injector injector) + { + final ServletContextHandler resources = new ServletContextHandler(ServletContextHandler.SESSIONS); + resources.addServlet(new ServletHolder(new DefaultServlet()), "/druid/v2/datasources/*"); + resources.addFilter(GuiceFilter.class, "/druid/v2/datasources/*", null); + + final ServletContextHandler queries = new ServletContextHandler(ServletContextHandler.SESSIONS); + queries.setResourceBase("/"); + queries.addServlet(new ServletHolder(injector.getInstance(QueryServlet.class)), "/druid/v2/*"); + + final ServletContextHandler root = new ServletContextHandler(ServletContextHandler.SESSIONS); + root.addServlet(new ServletHolder(new DefaultServlet()), "/*"); + root.addFilter(GzipFilter.class, "/*", null); + root.addFilter(GuiceFilter.class, "/*", null); + + final HandlerList handlerList = new HandlerList(); + handlerList.setHandlers(new Handler[]{resources, queries, root, new DefaultHandler()}); + server.setHandler(handlerList); + } +} diff --git a/services/src/main/java/io/druid/cli/CliBroker.java b/services/src/main/java/io/druid/cli/CliBroker.java index 9dc9e985960..095300d5de1 100644 --- a/services/src/main/java/io/druid/cli/CliBroker.java +++ b/services/src/main/java/io/druid/cli/CliBroker.java @@ -21,9 +21,7 @@ package io.druid.cli; import com.google.common.collect.ImmutableList; import com.google.inject.Binder; -import com.google.inject.Injector; import com.google.inject.Module; -import com.google.inject.servlet.GuiceFilter; import com.metamx.common.logger.Logger; import io.airlift.command.Command; import io.druid.client.BrokerServerView; @@ -44,18 +42,9 @@ import io.druid.query.QuerySegmentWalker; import io.druid.query.QueryToolChestWarehouse; import io.druid.server.ClientInfoResource; import io.druid.server.ClientQuerySegmentWalker; -import io.druid.server.QueryServlet; -import io.druid.server.StatusResource; import io.druid.server.initialization.JettyServerInitializer; import io.druid.server.metrics.MetricsModule; -import org.eclipse.jetty.server.Handler; import org.eclipse.jetty.server.Server; -import org.eclipse.jetty.server.handler.DefaultHandler; -import org.eclipse.jetty.server.handler.HandlerList; -import org.eclipse.jetty.servlet.DefaultServlet; -import org.eclipse.jetty.servlet.ServletContextHandler; -import org.eclipse.jetty.servlet.ServletHolder; -import org.eclipse.jetty.servlets.GzipFilter; import java.util.List; @@ -103,28 +92,4 @@ public class CliBroker extends ServerRunnable } ); } - - private static class BrokerJettyServerInitializer implements JettyServerInitializer - { - @Override - public void initialize(Server server, Injector injector) - { - final ServletContextHandler resources = new ServletContextHandler(ServletContextHandler.SESSIONS); - resources.addServlet(new ServletHolder(new DefaultServlet()), "/druid/v2/datasources/*"); - resources.addFilter(GuiceFilter.class, "/druid/v2/datasources/*", null); - - final ServletContextHandler queries = new ServletContextHandler(ServletContextHandler.SESSIONS); - queries.setResourceBase("/"); - queries.addServlet(new ServletHolder(injector.getInstance(QueryServlet.class)), "/druid/v2/*"); - - final ServletContextHandler root = new ServletContextHandler(ServletContextHandler.SESSIONS); - root.addServlet(new ServletHolder(new DefaultServlet()), "/*"); - root.addFilter(GzipFilter.class, "/*", null); - root.addFilter(GuiceFilter.class, "/*", null); - - final HandlerList handlerList = new HandlerList(); - handlerList.setHandlers(new Handler[]{resources, queries, root, new DefaultHandler()}); - server.setHandler(handlerList); - } - } } diff --git a/services/src/main/java/io/druid/cli/CliHadoopIndexer.java b/services/src/main/java/io/druid/cli/CliHadoopIndexer.java new file mode 100644 index 00000000000..b69fcd30212 --- /dev/null +++ b/services/src/main/java/io/druid/cli/CliHadoopIndexer.java @@ -0,0 +1,157 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012, 2013 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package io.druid.cli; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.api.client.repackaged.com.google.common.base.Throwables; +import com.google.common.base.Preconditions; +import com.google.common.base.Supplier; +import com.google.common.collect.ImmutableList; +import com.google.common.io.CharStreams; +import com.google.common.io.InputSupplier; +import com.google.inject.Binder; +import com.google.inject.Injector; +import com.google.inject.Module; +import com.google.inject.Provides; +import com.metamx.common.lifecycle.Lifecycle; +import com.metamx.common.logger.Logger; +import com.metamx.emitter.core.LoggingEmitter; +import com.metamx.emitter.core.LoggingEmitterConfig; +import com.metamx.emitter.service.ServiceEmitter; +import io.airlift.command.Arguments; +import io.airlift.command.Command; +import io.druid.guice.LazySingleton; +import io.druid.guice.ManageLifecycle; +import io.druid.indexer.HadoopDruidIndexerConfig; +import io.druid.indexer.HadoopDruidIndexerJob; +import io.druid.initialization.LogLevelAdjuster; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +import java.io.File; +import java.io.IOException; +import java.io.InputStreamReader; +import java.util.List; + +/** + */ +@Command( + name = "hadoop", + description = "Runs the batch Hadoop Druid Indexer, see LINK GOES HERE for a description." +) +public class CliHadoopIndexer extends GuiceRunnable +{ + @Arguments(description = "A JSON object or the path to a file that contains a JSON object") + private String argumentSpec; + + private static final Logger log = new Logger(CliHadoopIndexer.class); + + public CliHadoopIndexer() + { + super(log); + } + + @Override + protected List getModules() + { + return ImmutableList.of( + new Module() + { + @Override + public void configure(Binder binder) + { + binder.bind(HadoopDruidIndexerJob.class).in(LazySingleton.class); + } + + @Provides + @LazySingleton + public HadoopDruidIndexerConfig getHadoopDruidIndexerConfig() + { + Preconditions.checkNotNull(argumentSpec, "argumentSpec"); + + try { + if (argumentSpec.startsWith("{")) { + return HadoopDruidIndexerConfig.fromString(argumentSpec); + } else if (argumentSpec.startsWith("s3://")) { + final Path s3nPath = new Path(String.format("s3n://%s", argumentSpec.substring("s3://".length()))); + final FileSystem fs = s3nPath.getFileSystem(new Configuration()); + + String configString = CharStreams.toString( + new InputSupplier() + { + @Override + public InputStreamReader getInput() throws IOException + { + return new InputStreamReader(fs.open(s3nPath)); + } + } + ); + + return HadoopDruidIndexerConfig.fromString(configString); + } else { + return HadoopDruidIndexerConfig.fromFile(new File(argumentSpec)); + } + } + catch (Exception e) { + throw Throwables.propagate(e); + } + } + } + ); + } + + @Override + public void run() + { + try { + LogLevelAdjuster.register(); + + final Injector injector = Initialization.makeInjectorWithModules( + getBaseInjector(), getModules() + ); + final Lifecycle lifecycle = injector.getInstance(Lifecycle.class); + final HadoopDruidIndexerJob job = injector.getInstance(HadoopDruidIndexerJob.class); + + try { + lifecycle.start(); + } + catch (Throwable t) { + log.error(t, "Error when starting up. Failing."); + System.exit(1); + } + + job.run(); + + try { + lifecycle.stop(); + } + catch (Throwable t) { + log.error(t, "Error when stopping. Failing."); + System.exit(1); + } + + } + catch (Exception e) { + throw com.google.common.base.Throwables.propagate(e); + } + } + +} \ No newline at end of file diff --git a/services/src/main/java/io/druid/cli/GuiceRunnable.java b/services/src/main/java/io/druid/cli/GuiceRunnable.java new file mode 100644 index 00000000000..f39bea6bac2 --- /dev/null +++ b/services/src/main/java/io/druid/cli/GuiceRunnable.java @@ -0,0 +1,76 @@ +package io.druid.cli; + +import com.google.common.base.Throwables; +import com.google.inject.Inject; +import com.google.inject.Injector; +import com.metamx.common.lifecycle.Lifecycle; +import com.metamx.common.logger.Logger; +import io.druid.initialization.LogLevelAdjuster; + +import java.util.List; + +/** + */ +public abstract class GuiceRunnable implements Runnable +{ + private final Logger log; + + private Injector baseInjector; + + public GuiceRunnable(Logger log) + { + this.log = log; + } + + @Inject + public void configure(Injector injector) + { + this.baseInjector = injector; + } + + public Injector getBaseInjector() + { + return baseInjector; + } + + protected abstract List getModules(); + + public Injector makeInjector() + { + try { + return Initialization.makeInjectorWithModules( + baseInjector, getModules() + ); + } + catch (Exception e) { + throw Throwables.propagate(e); + } + } + + public Lifecycle initLifecycle(Injector injector) + { + try { + LogLevelAdjuster.register(); + final Lifecycle lifecycle = injector.getInstance(Lifecycle.class); + + try { + lifecycle.start(); + } + catch (Throwable t) { + log.error(t, "Error when starting up. Failing."); + System.exit(1); + } + + return lifecycle; + } + catch (Exception e) { + throw Throwables.propagate(e); + } + } + + @Override + public void run() + { + initLifecycle(makeInjector()); + } +} diff --git a/services/src/main/java/io/druid/cli/Main.java b/services/src/main/java/io/druid/cli/Main.java index 3c39a82d0ed..6f7755fd02c 100644 --- a/services/src/main/java/io/druid/cli/Main.java +++ b/services/src/main/java/io/druid/cli/Main.java @@ -68,6 +68,11 @@ public class Main .withDefaultCommand(Help.class) .withCommands(ConvertProperties.class); + builder.withGroup("index") + .withDescription("Run indexing for druid") + .withDefaultCommand(Help.class) + .withCommands(CliHadoopIndexer.class); + builder.withGroup("internal") .withDescription("Processes that Druid runs \"internally\", you should rarely use these directly") .withDefaultCommand(Help.class) diff --git a/services/src/main/java/io/druid/cli/ServerRunnable.java b/services/src/main/java/io/druid/cli/ServerRunnable.java index e4ea30b1a70..733a8ee8555 100644 --- a/services/src/main/java/io/druid/cli/ServerRunnable.java +++ b/services/src/main/java/io/druid/cli/ServerRunnable.java @@ -20,54 +20,26 @@ package io.druid.cli; import com.google.common.base.Throwables; -import com.google.inject.Inject; import com.google.inject.Injector; import com.metamx.common.lifecycle.Lifecycle; import com.metamx.common.logger.Logger; -import io.druid.initialization.LogLevelAdjuster; - -import java.util.List; /** */ -public abstract class ServerRunnable implements Runnable +public abstract class ServerRunnable extends GuiceRunnable { - private final Logger log; - - private Injector baseInjector; - public ServerRunnable(Logger log) { - this.log = log; + super(log); } - @Inject - public void configure(Injector injector) - { - this.baseInjector = injector; - } - - protected abstract List getModules(); - @Override public void run() { + final Injector injector = makeInjector(); + final Lifecycle lifecycle = initLifecycle(injector); + try { - LogLevelAdjuster.register(); - - final Injector injector = Initialization.makeInjectorWithModules( - baseInjector, getModules() - ); - final Lifecycle lifecycle = injector.getInstance(Lifecycle.class); - - try { - lifecycle.start(); - } - catch (Throwable t) { - log.error(t, "Error when starting up. Failing."); - System.exit(1); - } - lifecycle.join(); } catch (Exception e) { From 8bc56daa663202aa06782c0ebefe7bbad696f334 Mon Sep 17 00:00:00 2001 From: fjy Date: Thu, 26 Sep 2013 11:35:45 -0700 Subject: [PATCH 07/10] fix things up according to code review comments --- .../indexer/HadoopDruidIndexerConfig.java | 35 ------------------- .../indexing/common/task/TaskSerdeTest.java | 1 - .../curator/discovery/DiscoveryModule.java | 1 + .../java/io/druid/cli/CliHadoopIndexer.java | 26 +++----------- .../main/java/io/druid/cli/GuiceRunnable.java | 11 ------ 5 files changed, 5 insertions(+), 69 deletions(-) diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerConfig.java b/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerConfig.java index 6723f49dbae..89922ad4921 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerConfig.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerConfig.java @@ -97,27 +97,6 @@ public class HadoopDruidIndexerConfig public static HadoopDruidIndexerConfig fromMap(Map argSpec) { - //List registererers = Lists.transform( - // MapUtils.getList(argSpec, "registererers", ImmutableList.of()), - // new Function() - // { - // @Override - // public Registererer apply(@Nullable Object input) - // { - // try { - // return (Registererer) Class.forName((String) input).newInstance(); - // } - // catch (Exception e) { - // throw Throwables.propagate(e); - // } - // } - // } - //); - - //if (!registererers.isEmpty()) { - // Registererers.registerHandlers(registererers, Arrays.asList(jsonMapper)); - //} - return jsonMapper.convertValue(argSpec, HadoopDruidIndexerConfig.class); } @@ -179,7 +158,6 @@ public class HadoopDruidIndexerConfig private volatile DataRollupSpec rollupSpec; private volatile DbUpdaterJobSpec updaterJobSpec; private volatile boolean ignoreInvalidRows = false; - //private volatile List registererers = Lists.newArrayList(); @JsonCreator public HadoopDruidIndexerConfig( @@ -204,7 +182,6 @@ public class HadoopDruidIndexerConfig final @JsonProperty("rollupSpec") DataRollupSpec rollupSpec, final @JsonProperty("updaterJobSpec") DbUpdaterJobSpec updaterJobSpec, final @JsonProperty("ignoreInvalidRows") boolean ignoreInvalidRows - //final @JsonProperty("registererers") List registererers ) { this.dataSource = dataSource; @@ -224,7 +201,6 @@ public class HadoopDruidIndexerConfig this.rollupSpec = rollupSpec; this.updaterJobSpec = updaterJobSpec; this.ignoreInvalidRows = ignoreInvalidRows; - //this.registererers = registererers; if(partitionsSpec != null) { Preconditions.checkArgument( @@ -517,17 +493,6 @@ public class HadoopDruidIndexerConfig this.ignoreInvalidRows = ignoreInvalidRows; } - //@JsonProperty - //public List getRegistererers() - //{ - // return registererers; - //} - // - //public void setRegistererers(List registererers) - //{ - // this.registererers = registererers; - //} - /******************************************** Granularity/Bucket Helper Methods ********************************************/ diff --git a/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java b/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java index a868f3d7fee..9c3ace35806 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java @@ -360,7 +360,6 @@ public class TaskSerdeTest new DataRollupSpec(ImmutableList.of(), QueryGranularity.NONE), null, false - //ImmutableList.of() ) ); diff --git a/server/src/main/java/io/druid/curator/discovery/DiscoveryModule.java b/server/src/main/java/io/druid/curator/discovery/DiscoveryModule.java index f6caf327332..3c651978129 100644 --- a/server/src/main/java/io/druid/curator/discovery/DiscoveryModule.java +++ b/server/src/main/java/io/druid/curator/discovery/DiscoveryModule.java @@ -120,6 +120,7 @@ public class DiscoveryModule implements Module */ public static void registerKey(Binder binder, Key key) { + DruidBinders.discoveryAnnouncementBinder(binder).addBinding().toInstance(new KeyHolder<>(key)); LifecycleModule.registerKey(binder, key); } diff --git a/services/src/main/java/io/druid/cli/CliHadoopIndexer.java b/services/src/main/java/io/druid/cli/CliHadoopIndexer.java index b69fcd30212..8024907bbee 100644 --- a/services/src/main/java/io/druid/cli/CliHadoopIndexer.java +++ b/services/src/main/java/io/druid/cli/CliHadoopIndexer.java @@ -19,10 +19,8 @@ package io.druid.cli; -import com.fasterxml.jackson.databind.ObjectMapper; import com.google.api.client.repackaged.com.google.common.base.Throwables; import com.google.common.base.Preconditions; -import com.google.common.base.Supplier; import com.google.common.collect.ImmutableList; import com.google.common.io.CharStreams; import com.google.common.io.InputSupplier; @@ -32,16 +30,11 @@ import com.google.inject.Module; import com.google.inject.Provides; import com.metamx.common.lifecycle.Lifecycle; import com.metamx.common.logger.Logger; -import com.metamx.emitter.core.LoggingEmitter; -import com.metamx.emitter.core.LoggingEmitterConfig; -import com.metamx.emitter.service.ServiceEmitter; import io.airlift.command.Arguments; import io.airlift.command.Command; import io.druid.guice.LazySingleton; -import io.druid.guice.ManageLifecycle; import io.druid.indexer.HadoopDruidIndexerConfig; import io.druid.indexer.HadoopDruidIndexerJob; -import io.druid.initialization.LogLevelAdjuster; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -55,7 +48,7 @@ import java.util.List; */ @Command( name = "hadoop", - description = "Runs the batch Hadoop Druid Indexer, see LINK GOES HERE for a description." + description = "Runs the batch Hadoop Druid Indexer, see https://github.com/metamx/druid/wiki/Batch-ingestion for a description." ) public class CliHadoopIndexer extends GuiceRunnable { @@ -122,21 +115,10 @@ public class CliHadoopIndexer extends GuiceRunnable public void run() { try { - LogLevelAdjuster.register(); - - final Injector injector = Initialization.makeInjectorWithModules( - getBaseInjector(), getModules() - ); - final Lifecycle lifecycle = injector.getInstance(Lifecycle.class); + Injector injector = makeInjector(); final HadoopDruidIndexerJob job = injector.getInstance(HadoopDruidIndexerJob.class); - try { - lifecycle.start(); - } - catch (Throwable t) { - log.error(t, "Error when starting up. Failing."); - System.exit(1); - } + Lifecycle lifecycle = initLifecycle(injector); job.run(); @@ -150,7 +132,7 @@ public class CliHadoopIndexer extends GuiceRunnable } catch (Exception e) { - throw com.google.common.base.Throwables.propagate(e); + throw Throwables.propagate(e); } } diff --git a/services/src/main/java/io/druid/cli/GuiceRunnable.java b/services/src/main/java/io/druid/cli/GuiceRunnable.java index f39bea6bac2..22d27cd6f7d 100644 --- a/services/src/main/java/io/druid/cli/GuiceRunnable.java +++ b/services/src/main/java/io/druid/cli/GuiceRunnable.java @@ -28,11 +28,6 @@ public abstract class GuiceRunnable implements Runnable this.baseInjector = injector; } - public Injector getBaseInjector() - { - return baseInjector; - } - protected abstract List getModules(); public Injector makeInjector() @@ -67,10 +62,4 @@ public abstract class GuiceRunnable implements Runnable throw Throwables.propagate(e); } } - - @Override - public void run() - { - initLifecycle(makeInjector()); - } } From b68c9fee83e711244238c1a1a6cecbb9a2d06935 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Xavier=20L=C3=A9aut=C3=A9?= Date: Thu, 26 Sep 2013 17:38:11 -0700 Subject: [PATCH 08/10] fix layout --- docs/content/Loading-Your-Data.md | 514 ++++++++++++++++-------------- 1 file changed, 277 insertions(+), 237 deletions(-) diff --git a/docs/content/Loading-Your-Data.md b/docs/content/Loading-Your-Data.md index 2e27fad8303..126a871ccca 100644 --- a/docs/content/Loading-Your-Data.md +++ b/docs/content/Loading-Your-Data.md @@ -7,6 +7,7 @@ Druid can ingest data in three ways: via Kafka and a realtime node, via the inde ## Create Config Directories ## Each type of node needs its own config file and directory, so create them as subdirectories under the druid directory. + ```bash mkdir config mkdir config/realtime @@ -24,147 +25,171 @@ mkdir config/broker Instructions for booting a Zookeeper and then Kafka cluster are available [here](http://kafka.apache.org/07/quickstart.html). 1. Download Apache Kafka 0.7.2 from [http://kafka.apache.org/downloads.html](http://kafka.apache.org/downloads.html) -```bash -wget http://apache.spinellicreations.com/incubator/kafka/kafka-0.7.2-incubating/kafka-0.7.2-incubating-src.tgz -tar -xvzf kafka-0.7.2-incubating-src.tgz -cd kafka-0.7.2-incubating-src -``` + + ```bash + wget http://apache.spinellicreations.com/incubator/kafka/kafka-0.7.2-incubating/kafka-0.7.2-incubating-src.tgz + tar -xvzf kafka-0.7.2-incubating-src.tgz + cd kafka-0.7.2-incubating-src + ``` + 2. Build Kafka -```bash -./sbt update -./sbt package -``` + + ```bash + ./sbt update + ./sbt package + ``` + 3. Boot Kafka -```bash -cat config/zookeeper.properties -bin/zookeeper-server-start.sh config/zookeeper.properties -# in a new console -bin/kafka-server-start.sh config/server.properties -``` + + ```bash + cat config/zookeeper.properties + bin/zookeeper-server-start.sh config/zookeeper.properties + # in a new console + bin/kafka-server-start.sh config/server.properties + ``` + 4. Launch the console producer (so you can type in JSON kafka messages in a bit) -```bash -bin/kafka-console-producer.sh --zookeeper localhost:2181 --topic druidtest -``` + + ```bash + bin/kafka-console-producer.sh --zookeeper localhost:2181 --topic druidtest + ``` + ### Launching a Realtime Node 1. Create a valid configuration file similar to this called config/realtime/runtime.properties: -``` -druid.host=0.0.0.0:8080 -druid.port=8080 -com.metamx.emitter.logging=true + ``` + druid.host=0.0.0.0:8080 + druid.port=8080 -druid.processing.formatString=processing_%s -druid.processing.numThreads=1 -druid.processing.buffer.sizeBytes=10000000 + com.metamx.emitter.logging=true -#emitting, opaque marker -druid.service=example + druid.processing.formatString=processing_%s + druid.processing.numThreads=1 + druid.processing.buffer.sizeBytes=10000000 -druid.request.logging.dir=/tmp/example/log -druid.realtime.specFile=realtime.spec -com.metamx.emitter.logging=true -com.metamx.emitter.logging.level=debug + #emitting, opaque marker + druid.service=example -# below are dummy values when operating a realtime only node -druid.processing.numThreads=3 + druid.request.logging.dir=/tmp/example/log + druid.realtime.specFile=realtime.spec + com.metamx.emitter.logging=true + com.metamx.emitter.logging.level=debug -com.metamx.aws.accessKey=dummy_access_key -com.metamx.aws.secretKey=dummy_secret_key -druid.pusher.s3.bucket=dummy_s3_bucket + # below are dummy values when operating a realtime only node + druid.processing.numThreads=3 -druid.zk.service.host=localhost -druid.server.maxSize=300000000000 -druid.zk.paths.base=/druid -druid.database.segmentTable=prod_segments -druid.database.user=user -druid.database.password=diurd -druid.database.connectURI= -druid.host=127.0.0.1:8080 + com.metamx.aws.accessKey=dummy_access_key + com.metamx.aws.secretKey=dummy_secret_key + druid.pusher.s3.bucket=dummy_s3_bucket + + druid.zk.service.host=localhost + druid.server.maxSize=300000000000 + druid.zk.paths.base=/druid + druid.database.segmentTable=prod_segments + druid.database.user=user + druid.database.password=diurd + druid.database.connectURI= + druid.host=127.0.0.1:8080 + + ``` -``` 2. Create a valid realtime configuration file similar to this called realtime.spec: -```json -[{ - "schema" : { "dataSource":"druidtest", - "aggregators":[ {"type":"count", "name":"impressions"}, - {"type":"doubleSum","name":"wp","fieldName":"wp"}], - "indexGranularity":"minute", - "shardSpec" : { "type": "none" } }, - "config" : { "maxRowsInMemory" : 500000, - "intermediatePersistPeriod" : "PT10m" }, - "firehose" : { "type" : "kafka-0.7.2", - "consumerProps" : { "zk.connect" : "localhost:2181", - "zk.connectiontimeout.ms" : "15000", - "zk.sessiontimeout.ms" : "15000", - "zk.synctime.ms" : "5000", - "groupid" : "topic-pixel-local", - "fetch.size" : "1048586", - "autooffset.reset" : "largest", - "autocommit.enable" : "false" }, - "feed" : "druidtest", - "parser" : { "timestampSpec" : { "column" : "utcdt", "format" : "iso" }, - "data" : { "format" : "json" }, - "dimensionExclusions" : ["wp"] } }, - "plumber" : { "type" : "realtime", - "windowPeriod" : "PT10m", - "segmentGranularity":"hour", - "basePersistDirectory" : "/tmp/realtime/basePersist", - "rejectionPolicy": {"type": "messageTime"} } -}] -``` + ```json + [{ + "schema" : { "dataSource":"druidtest", + "aggregators":[ {"type":"count", "name":"impressions"}, + {"type":"doubleSum","name":"wp","fieldName":"wp"}], + "indexGranularity":"minute", + "shardSpec" : { "type": "none" } }, + "config" : { "maxRowsInMemory" : 500000, + "intermediatePersistPeriod" : "PT10m" }, + "firehose" : { "type" : "kafka-0.7.2", + "consumerProps" : { "zk.connect" : "localhost:2181", + "zk.connectiontimeout.ms" : "15000", + "zk.sessiontimeout.ms" : "15000", + "zk.synctime.ms" : "5000", + "groupid" : "topic-pixel-local", + "fetch.size" : "1048586", + "autooffset.reset" : "largest", + "autocommit.enable" : "false" }, + "feed" : "druidtest", + "parser" : { "timestampSpec" : { "column" : "utcdt", "format" : "iso" }, + "data" : { "format" : "json" }, + "dimensionExclusions" : ["wp"] } }, + "plumber" : { "type" : "realtime", + "windowPeriod" : "PT10m", + "segmentGranularity":"hour", + "basePersistDirectory" : "/tmp/realtime/basePersist", + "rejectionPolicy": {"type": "messageTime"} } + + }] + ``` + 3. Launch the realtime node -```bash -java -Xmx256m -Duser.timezone=UTC -Dfile.encoding=UTF-8 \ --Ddruid.realtime.specFile=config/realtime/realtime.spec \ --classpath lib/*:config/realtime com.metamx.druid.realtime.RealtimeMain -``` + + ```bash + java -Xmx256m -Duser.timezone=UTC -Dfile.encoding=UTF-8 \ + -Ddruid.realtime.specFile=config/realtime/realtime.spec \ + -classpath lib/*:config/realtime com.metamx.druid.realtime.RealtimeMain + ``` + 4. Paste data into the Kafka console producer -```json -{"utcdt": "2010-01-01T01:01:01", "wp": 1000, "gender": "male", "age": 100} -{"utcdt": "2010-01-01T01:01:02", "wp": 2000, "gender": "female", "age": 50} -{"utcdt": "2010-01-01T01:01:03", "wp": 3000, "gender": "male", "age": 20} -{"utcdt": "2010-01-01T01:01:04", "wp": 4000, "gender": "female", "age": 30} -{"utcdt": "2010-01-01T01:01:05", "wp": 5000, "gender": "male", "age": 40} -``` + + ```json + {"utcdt": "2010-01-01T01:01:01", "wp": 1000, "gender": "male", "age": 100} + {"utcdt": "2010-01-01T01:01:02", "wp": 2000, "gender": "female", "age": 50} + {"utcdt": "2010-01-01T01:01:03", "wp": 3000, "gender": "male", "age": 20} + {"utcdt": "2010-01-01T01:01:04", "wp": 4000, "gender": "female", "age": 30} + {"utcdt": "2010-01-01T01:01:05", "wp": 5000, "gender": "male", "age": 40} + ``` + 5. Watch the events as they are ingested by Druid's realtime node -```bash -... -2013-06-17 21:41:55,569 INFO [Global--0] com.metamx.emitter.core.LoggingEmitter - Event [{"feed":"metrics","timestamp":"2013-06-17T21:41:55.569Z","service":"example","host":"127.0.0.1","metric":"events/processed","value":5,"user2":"druidtest"}] -... -``` + + ```bash + ... + 2013-06-17 21:41:55,569 INFO [Global--0] com.metamx.emitter.core.LoggingEmitter - Event [{"feed":"metrics","timestamp":"2013-06-17T21:41:55.569Z","service":"example","host":"127.0.0.1","metric":"events/processed","value":5,"user2":"druidtest"}] + ... + ``` + 6. In a new console, edit a file called query.body: -```json -{ - "queryType": "groupBy", - "dataSource": "druidtest", - "granularity": "all", - "dimensions": [], - "aggregations": [ - { "type": "count", "name": "rows" }, - {"type": "longSum", "name": "imps", "fieldName": "impressions"}, - {"type": "doubleSum", "name": "wp", "fieldName": "wp"} - ], - "intervals": ["2010-01-01T00:00/2020-01-01T00"] -} -``` -7. Submit the query via curl -```bash -curl -X POST "http://localhost:8080/druid/v2/?pretty" \ --H 'content-type: application/json' -d @query.body -``` -8. View Result! -```json -[ { - "timestamp" : "2010-01-01T01:01:00.000Z", - "result" : { - "imps" : 20, - "wp" : 60000.0, - "rows" : 5 + + ```json + { + "queryType": "groupBy", + "dataSource": "druidtest", + "granularity": "all", + "dimensions": [], + "aggregations": [ + { "type": "count", "name": "rows" }, + {"type": "longSum", "name": "imps", "fieldName": "impressions"}, + {"type": "doubleSum", "name": "wp", "fieldName": "wp"} + ], + "intervals": ["2010-01-01T00:00/2020-01-01T00"] } -} ] -``` + ``` + +7. Submit the query via curl + + ```bash + curl -X POST "http://localhost:8080/druid/v2/?pretty" \ + -H 'content-type: application/json' -d @query.body + ``` + +8. View Result! + + ```json + [ { + "timestamp" : "2010-01-01T01:01:00.000Z", + "result" : { + "imps" : 20, + "wp" : 60000.0, + "rows" : 5 + } + } ] + ``` + Now you're ready for [Querying Your Data](Querying-Your-Data.html)! ## Loading Data with the HadoopDruidIndexer ## @@ -177,13 +202,16 @@ The setup for a single node, 'standalone' Hadoop cluster is available at [http:/ 1. If you don't already have it, download MySQL Community Server here: [http://dev.mysql.com/downloads/mysql/](http://dev.mysql.com/downloads/mysql/) 2. Install MySQL 3. Create a druid user and database + ```bash mysql -u root ``` + ```sql GRANT ALL ON druid.* TO 'druid'@'localhost' IDENTIFIED BY 'diurd'; CREATE database druid; ``` + The [Master](Master.html) node will create the tables it needs based on its configuration. ### Make sure you have ZooKeeper Running ### @@ -206,114 +234,123 @@ cd .. ``` ### Launch a Master Node ### + If you've already setup a realtime node, be aware that although you can run multiple node types on one physical computer, you must assign them unique ports. Having used 8080 for the [Realtime](Realtime.html) node, we use 8081 for the [Master](Master.html). 1. Setup a configuration file called config/master/runtime.properties similar to: -```bash -druid.host=0.0.0.0:8081 -druid.port=8081 -com.metamx.emitter.logging=true + ```bash + druid.host=0.0.0.0:8081 + druid.port=8081 -druid.processing.formatString=processing_%s -druid.processing.numThreads=1 -druid.processing.buffer.sizeBytes=10000000 + com.metamx.emitter.logging=true -#emitting, opaque marker -druid.service=example + druid.processing.formatString=processing_%s + druid.processing.numThreads=1 + druid.processing.buffer.sizeBytes=10000000 -druid.master.startDelay=PT60s -druid.request.logging.dir=/tmp/example/log -druid.realtime.specFile=realtime.spec -com.metamx.emitter.logging=true -com.metamx.emitter.logging.level=debug + # emitting, opaque marker + druid.service=example -# below are dummy values when operating a realtime only node -druid.processing.numThreads=3 + druid.master.startDelay=PT60s + druid.request.logging.dir=/tmp/example/log + druid.realtime.specFile=realtime.spec + com.metamx.emitter.logging=true + com.metamx.emitter.logging.level=debug -com.metamx.aws.accessKey=dummy_access_key -com.metamx.aws.secretKey=dummy_secret_key -druid.pusher.s3.bucket=dummy_s3_bucket + # below are dummy values when operating a realtime only node + druid.processing.numThreads=3 -druid.zk.service.host=localhost -druid.server.maxSize=300000000000 -druid.zk.paths.base=/druid -druid.database.segmentTable=prod_segments -druid.database.user=druid -druid.database.password=diurd -druid.database.connectURI=jdbc:mysql://localhost:3306/druid -druid.zk.paths.discoveryPath=/druid/discoveryPath -druid.database.ruleTable=rules -druid.database.configTable=config + com.metamx.aws.accessKey=dummy_access_key + com.metamx.aws.secretKey=dummy_secret_key + druid.pusher.s3.bucket=dummy_s3_bucket + + druid.zk.service.host=localhost + druid.server.maxSize=300000000000 + druid.zk.paths.base=/druid + druid.database.segmentTable=prod_segments + druid.database.user=druid + druid.database.password=diurd + druid.database.connectURI=jdbc:mysql://localhost:3306/druid + druid.zk.paths.discoveryPath=/druid/discoveryPath + druid.database.ruleTable=rules + druid.database.configTable=config + + # Path on local FS for storage of segments; dir will be created if needed + druid.paths.indexCache=/tmp/druid/indexCache + # Path on local FS for storage of segment metadata; dir will be created if needed + druid.paths.segmentInfoCache=/tmp/druid/segmentInfoCache + ``` -# Path on local FS for storage of segments; dir will be created if needed -druid.paths.indexCache=/tmp/druid/indexCache -# Path on local FS for storage of segment metadata; dir will be created if needed -druid.paths.segmentInfoCache=/tmp/druid/segmentInfoCache -``` 2. Launch the [Master](Master.html) node -```bash -java -Xmx256m -Duser.timezone=UTC -Dfile.encoding=UTF-8 \ --classpath lib/*:config/master \ -com.metamx.druid.http.MasterMain -``` + + ```bash + java -Xmx256m -Duser.timezone=UTC -Dfile.encoding=UTF-8 \ + -classpath lib/*:config/master \ + com.metamx.druid.http.MasterMain + ``` ### Launch a Compute/Historical Node ### + 1. Create a configuration file in config/compute/runtime.properties similar to: -```bash -druid.host=0.0.0.0:8082 -druid.port=8082 -com.metamx.emitter.logging=true + ```bash + druid.host=0.0.0.0:8082 + druid.port=8082 -druid.processing.formatString=processing_%s -druid.processing.numThreads=1 -druid.processing.buffer.sizeBytes=10000000 + com.metamx.emitter.logging=true -#emitting, opaque marker -druid.service=example + druid.processing.formatString=processing_%s + druid.processing.numThreads=1 + druid.processing.buffer.sizeBytes=10000000 -druid.request.logging.dir=/tmp/example/log -druid.realtime.specFile=realtime.spec -com.metamx.emitter.logging=true -com.metamx.emitter.logging.level=debug + # emitting, opaque marker + druid.service=example -# below are dummy values when operating a realtime only node -druid.processing.numThreads=3 + druid.request.logging.dir=/tmp/example/log + druid.realtime.specFile=realtime.spec + com.metamx.emitter.logging=true + com.metamx.emitter.logging.level=debug -com.metamx.aws.accessKey=dummy_access_key -com.metamx.aws.secretKey=dummy_secret_key -druid.pusher.s3.bucket=dummy_s3_bucket + # below are dummy values when operating a realtime only node + druid.processing.numThreads=3 -druid.zk.service.host=localhost -druid.server.maxSize=300000000000 -druid.zk.paths.base=/druid -druid.database.segmentTable=prod_segments -druid.database.user=druid -druid.database.password=diurd -druid.database.connectURI=jdbc:mysql://localhost:3306/druid -druid.zk.paths.discoveryPath=/druid/discoveryPath -druid.database.ruleTable=rules -druid.database.configTable=config + com.metamx.aws.accessKey=dummy_access_key + com.metamx.aws.secretKey=dummy_secret_key + druid.pusher.s3.bucket=dummy_s3_bucket + + druid.zk.service.host=localhost + druid.server.maxSize=300000000000 + druid.zk.paths.base=/druid + druid.database.segmentTable=prod_segments + druid.database.user=druid + druid.database.password=diurd + druid.database.connectURI=jdbc:mysql://localhost:3306/druid + druid.zk.paths.discoveryPath=/druid/discoveryPath + druid.database.ruleTable=rules + druid.database.configTable=config # Path on local FS for storage of segments; dir will be created if needed -druid.paths.indexCache=/tmp/druid/indexCache + druid.paths.indexCache=/tmp/druid/indexCache # Path on local FS for storage of segment metadata; dir will be created if needed -druid.paths.segmentInfoCache=/tmp/druid/segmentInfoCache + druid.paths.segmentInfoCache=/tmp/druid/segmentInfoCache # Setup local storage mode -druid.pusher.local.storageDirectory=/tmp/druid/localStorage -druid.pusher.local=true -``` + druid.pusher.local.storageDirectory=/tmp/druid/localStorage + druid.pusher.local=true + ``` + 2. Launch the compute node: -```bash -java -Xmx256m -Duser.timezone=UTC -Dfile.encoding=UTF-8 \ --classpath lib/*:config/compute \ -com.metamx.druid.http.ComputeMain -``` + + ```bash + java -Xmx256m -Duser.timezone=UTC -Dfile.encoding=UTF-8 \ + -classpath lib/*:config/compute \ + com.metamx.druid.http.ComputeMain + ``` ### Create a File of Records ### We can use the same records we have been, in a file called records.json: + ```json {"utcdt": "2010-01-01T01:01:01", "wp": 1000, "gender": "male", "age": 100} {"utcdt": "2010-01-01T01:01:02", "wp": 2000, "gender": "female", "age": 50} @@ -327,44 +364,47 @@ We can use the same records we have been, in a file called records.json: Now its time to run the Hadoop [Batch-ingestion](Batch-ingestion.html) job, HadoopDruidIndexer, which will fill a historical [Compute](Compute.html) node with data. First we'll need to configure the job. 1. Create a config called batchConfig.json similar to: -```json -{ - "dataSource": "druidtest", - "timestampColumn": "utcdt", - "timestampFormat": "iso", - "dataSpec": { - "format": "json", - "dimensions": ["gender", "age"] - }, - "granularitySpec": { - "type":"uniform", - "intervals":["2010-01-01T01/PT1H"], - "gran":"hour" - }, - "pathSpec": { "type": "static", - "paths": "/Users/rjurney/Software/druid/records.json" }, - "rollupSpec": { "aggs":[ {"type":"count", "name":"impressions"}, - {"type":"doubleSum","name":"wp","fieldName":"wp"} - ], - "rollupGranularity": "minute"}, - "workingPath": "/tmp/working_path", - "segmentOutputPath": "/tmp/segments", - "leaveIntermediate": "false", - "partitionsSpec": { - "targetPartitionSize": 5000000 - }, - "updaterJobSpec": { - "type":"db", - "connectURI":"jdbc:mysql://localhost:3306/druid", - "user":"druid", - "password":"diurd", - "segmentTable":"prod_segments" - } -} -``` -2. Now run the job, with the config pointing at batchConfig.json: -```bash -java -Xmx256m -Duser.timezone=UTC -Dfile.encoding=UTF-8 -Ddruid.realtime.specFile=realtime.spec -classpath lib/* com.metamx.druid.indexer.HadoopDruidIndexerMain batchConfig.json -``` -You can now move on to [Querying Your Data](Querying-Your-Data.html)! \ No newline at end of file + ```json + { + "dataSource": "druidtest", + "timestampColumn": "utcdt", + "timestampFormat": "iso", + "dataSpec": { + "format": "json", + "dimensions": ["gender", "age"] + }, + "granularitySpec": { + "type":"uniform", + "intervals":["2010-01-01T01/PT1H"], + "gran":"hour" + }, + "pathSpec": { "type": "static", + "paths": "/Users/rjurney/Software/druid/records.json" }, + "rollupSpec": { "aggs":[ {"type":"count", "name":"impressions"}, + {"type":"doubleSum","name":"wp","fieldName":"wp"} + ], + "rollupGranularity": "minute"}, + "workingPath": "/tmp/working_path", + "segmentOutputPath": "/tmp/segments", + "leaveIntermediate": "false", + "partitionsSpec": { + "targetPartitionSize": 5000000 + }, + "updaterJobSpec": { + "type":"db", + "connectURI":"jdbc:mysql://localhost:3306/druid", + "user":"druid", + "password":"diurd", + "segmentTable":"prod_segments" + } + } + ``` + +2. Now run the job, with the config pointing at batchConfig.json: + + ```bash + java -Xmx256m -Duser.timezone=UTC -Dfile.encoding=UTF-8 -Ddruid.realtime.specFile=realtime.spec -classpath lib/* com.metamx.druid.indexer.HadoopDruidIndexerMain batchConfig.json + ``` + +You can now move on to [Querying Your Data](Querying-Your-Data.html)! From e404295c1f921dac31c2c9c0855204e8b402c85b Mon Sep 17 00:00:00 2001 From: fjy Date: Thu, 26 Sep 2013 17:44:21 -0700 Subject: [PATCH 09/10] make indexing service work --- .../indexing/common/TaskToolboxFactory.java | 3 +- .../actions/RemoteTaskActionClient.java | 10 +- .../indexing/common/task/MergeTaskBase.java | 1 - .../config/ForkingTaskRunnerConfig.java | 11 +- .../http/IndexerCoordinatorResource.java | 8 +- .../http/OldIndexerCoordinatorResource.java | 6 +- .../scaling/EC2AutoScalingStrategy.java | 3 +- .../coordinator/scaling/ScalingStats.java | 12 +- .../indexing/worker/config/WorkerConfig.java | 9 -- .../worker/executor/ExecutorLifecycle.java | 1 + .../indexer_static/js/console-0.0.1.js | 8 +- .../io/druid/client/DruidServerConfig.java | 2 +- .../indexing/IndexingServiceClient.java | 14 +- .../curator/discovery/DiscoveryModule.java | 3 +- .../java/io/druid/server/QueryServlet.java | 2 - .../initialization/JettyServerModule.java | 17 ++- .../io/druid/server/master/DruidMaster.java | 9 +- .../server/master/DruidMasterConfig.java | 2 +- .../server/master/MasterSegmentSettings.java | 45 ++++--- .../java/io/druid/cli/CliHadoopIndexer.java | 4 +- .../main/java/io/druid/cli/CliOverlord.java | 2 + .../src/main/java/io/druid/cli/CliPeon.java | 124 +++++++++--------- .../druid/cli/convert/ConvertProperties.java | 3 +- 23 files changed, 154 insertions(+), 145 deletions(-) diff --git a/indexing-service/src/main/java/io/druid/indexing/common/TaskToolboxFactory.java b/indexing-service/src/main/java/io/druid/indexing/common/TaskToolboxFactory.java index d20480b9de0..33e2a367c41 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/TaskToolboxFactory.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/TaskToolboxFactory.java @@ -24,6 +24,7 @@ import com.google.inject.Inject; import com.metamx.emitter.service.ServiceEmitter; import com.metamx.metrics.MonitorScheduler; import io.druid.client.ServerView; +import io.druid.guice.annotations.Json; import io.druid.guice.annotations.Processing; import io.druid.indexing.common.actions.TaskActionClientFactory; import io.druid.indexing.common.config.TaskConfig; @@ -67,7 +68,7 @@ public class TaskToolboxFactory @Processing ExecutorService queryExecutorService, MonitorScheduler monitorScheduler, SegmentLoaderFactory segmentLoaderFactory, - ObjectMapper objectMapper + @Json ObjectMapper objectMapper ) { this.config = config; diff --git a/indexing-service/src/main/java/io/druid/indexing/common/actions/RemoteTaskActionClient.java b/indexing-service/src/main/java/io/druid/indexing/common/actions/RemoteTaskActionClient.java index 41639aa9989..a72ece1ae80 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/actions/RemoteTaskActionClient.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/actions/RemoteTaskActionClient.java @@ -132,14 +132,6 @@ public class RemoteTaskActionClient implements TaskActionClient throw new ISE("Cannot find instance of indexer to talk to!"); } - return new URI( - instance.getScheme(), - null, - instance.getHost(), - instance.getPort(), - "/druid/indexer/v1/action", - null, - null - ); + return new URI(String.format("%s://%s%s", instance.getScheme(), instance.getHost(), "/druid/indexer/v1/action")); } } diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/MergeTaskBase.java b/indexing-service/src/main/java/io/druid/indexing/common/task/MergeTaskBase.java index e597815a3d4..5936d45a278 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/MergeTaskBase.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/MergeTaskBase.java @@ -125,7 +125,6 @@ public abstract class MergeTaskBase extends AbstractTask final File taskDir = toolbox.getTaskWorkDir(); try { - final long startTime = System.currentTimeMillis(); log.info( diff --git a/indexing-service/src/main/java/io/druid/indexing/coordinator/config/ForkingTaskRunnerConfig.java b/indexing-service/src/main/java/io/druid/indexing/coordinator/config/ForkingTaskRunnerConfig.java index 930c6167f16..d48b6b212f6 100644 --- a/indexing-service/src/main/java/io/druid/indexing/coordinator/config/ForkingTaskRunnerConfig.java +++ b/indexing-service/src/main/java/io/druid/indexing/coordinator/config/ForkingTaskRunnerConfig.java @@ -55,12 +55,19 @@ public class ForkingTaskRunnerConfig private String classpath = System.getProperty("java.class.path"); @JsonProperty - @Min(1024) @Max(65535) + @Min(1024) + @Max(65535) private int startPort = 8080; @JsonProperty @NotNull - List allowedPrefixes = Lists.newArrayList("com.metamx", "druid", "io.druid"); + List allowedPrefixes = Lists.newArrayList( + "com.metamx", + "druid", + "io.druid", + "user.timezone", + "file.encoding" + ); public int maxForks() { diff --git a/indexing-service/src/main/java/io/druid/indexing/coordinator/http/IndexerCoordinatorResource.java b/indexing-service/src/main/java/io/druid/indexing/coordinator/http/IndexerCoordinatorResource.java index 090997035b5..9a2638e8d67 100644 --- a/indexing-service/src/main/java/io/druid/indexing/coordinator/http/IndexerCoordinatorResource.java +++ b/indexing-service/src/main/java/io/druid/indexing/coordinator/http/IndexerCoordinatorResource.java @@ -19,7 +19,6 @@ package io.druid.indexing.coordinator.http; -import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Function; import com.google.common.base.Optional; import com.google.common.collect.Collections2; @@ -89,7 +88,6 @@ public class IndexerCoordinatorResource private final TaskStorageQueryAdapter taskStorageQueryAdapter; private final TaskLogStreamer taskLogStreamer; private final JacksonConfigManager configManager; - private final ObjectMapper jsonMapper; private AtomicReference workerSetupDataRef = null; @@ -98,21 +96,20 @@ public class IndexerCoordinatorResource TaskMaster taskMaster, TaskStorageQueryAdapter taskStorageQueryAdapter, TaskLogStreamer taskLogStreamer, - JacksonConfigManager configManager, - ObjectMapper jsonMapper + JacksonConfigManager configManager ) throws Exception { this.taskMaster = taskMaster; this.taskStorageQueryAdapter = taskStorageQueryAdapter; this.taskLogStreamer = taskLogStreamer; this.configManager = configManager; - this.jsonMapper = jsonMapper; } @POST @Path("/merge") @Consumes("application/json") @Produces("application/json") + @Deprecated public Response doMerge(final Task task) { // legacy endpoint @@ -123,6 +120,7 @@ public class IndexerCoordinatorResource @Path("/index") @Consumes("application/json") @Produces("application/json") + @Deprecated public Response doIndex(final Task task) { return taskPost(task); diff --git a/indexing-service/src/main/java/io/druid/indexing/coordinator/http/OldIndexerCoordinatorResource.java b/indexing-service/src/main/java/io/druid/indexing/coordinator/http/OldIndexerCoordinatorResource.java index 11df4499edf..b8a6f679df8 100644 --- a/indexing-service/src/main/java/io/druid/indexing/coordinator/http/OldIndexerCoordinatorResource.java +++ b/indexing-service/src/main/java/io/druid/indexing/coordinator/http/OldIndexerCoordinatorResource.java @@ -19,7 +19,6 @@ package io.druid.indexing.coordinator.http; -import com.fasterxml.jackson.databind.ObjectMapper; import com.google.inject.Inject; import io.druid.common.config.JacksonConfigManager; import io.druid.indexing.common.tasklogs.TaskLogStreamer; @@ -39,10 +38,9 @@ public class OldIndexerCoordinatorResource extends IndexerCoordinatorResource TaskMaster taskMaster, TaskStorageQueryAdapter taskStorageQueryAdapter, TaskLogStreamer taskLogStreamer, - JacksonConfigManager configManager, - ObjectMapper jsonMapper + JacksonConfigManager configManager ) throws Exception { - super(taskMaster, taskStorageQueryAdapter, taskLogStreamer, configManager, jsonMapper); + super(taskMaster, taskStorageQueryAdapter, taskLogStreamer, configManager); } } diff --git a/indexing-service/src/main/java/io/druid/indexing/coordinator/scaling/EC2AutoScalingStrategy.java b/indexing-service/src/main/java/io/druid/indexing/coordinator/scaling/EC2AutoScalingStrategy.java index cda0da63568..8933fb16041 100644 --- a/indexing-service/src/main/java/io/druid/indexing/coordinator/scaling/EC2AutoScalingStrategy.java +++ b/indexing-service/src/main/java/io/druid/indexing/coordinator/scaling/EC2AutoScalingStrategy.java @@ -34,6 +34,7 @@ import com.google.common.base.Supplier; import com.google.common.collect.Lists; import com.google.inject.Inject; import com.metamx.emitter.EmittingLogger; +import io.druid.guice.annotations.Json; import io.druid.indexing.coordinator.setup.EC2NodeData; import io.druid.indexing.coordinator.setup.GalaxyUserData; import io.druid.indexing.coordinator.setup.WorkerSetupData; @@ -55,7 +56,7 @@ public class EC2AutoScalingStrategy implements AutoScalingStrategy @Inject public EC2AutoScalingStrategy( - ObjectMapper jsonMapper, + @Json ObjectMapper jsonMapper, AmazonEC2 amazonEC2Client, SimpleResourceManagementConfig config, Supplier workerSetupDataRef diff --git a/indexing-service/src/main/java/io/druid/indexing/coordinator/scaling/ScalingStats.java b/indexing-service/src/main/java/io/druid/indexing/coordinator/scaling/ScalingStats.java index 7e5e1c23fe1..7af8de7b6ab 100644 --- a/indexing-service/src/main/java/io/druid/indexing/coordinator/scaling/ScalingStats.java +++ b/indexing-service/src/main/java/io/druid/indexing/coordinator/scaling/ScalingStats.java @@ -54,10 +54,14 @@ public class ScalingStats public ScalingStats(int capacity) { - this.recentEvents = MinMaxPriorityQueue - .orderedBy(comparator) - .maximumSize(capacity) - .create(); + if (capacity == 0) { + this.recentEvents = MinMaxPriorityQueue.orderedBy(comparator).create(); + } else { + this.recentEvents = MinMaxPriorityQueue + .orderedBy(comparator) + .maximumSize(capacity) + .create(); + } } public void addProvisionEvent(AutoScalingData data) diff --git a/indexing-service/src/main/java/io/druid/indexing/worker/config/WorkerConfig.java b/indexing-service/src/main/java/io/druid/indexing/worker/config/WorkerConfig.java index f1e1bf4d32f..81f41d43393 100644 --- a/indexing-service/src/main/java/io/druid/indexing/worker/config/WorkerConfig.java +++ b/indexing-service/src/main/java/io/druid/indexing/worker/config/WorkerConfig.java @@ -36,10 +36,6 @@ public class WorkerConfig @NotNull private String version = null; - @JsonProperty - @NotNull - private String overlordService = null; - @JsonProperty @Min(1) private int capacity = Runtime.getRuntime().availableProcessors() - 1; @@ -54,11 +50,6 @@ public class WorkerConfig return version; } - public String getOverlordService() - { - return overlordService; - } - public int getCapacity() { return capacity; diff --git a/indexing-service/src/main/java/io/druid/indexing/worker/executor/ExecutorLifecycle.java b/indexing-service/src/main/java/io/druid/indexing/worker/executor/ExecutorLifecycle.java index 8f0fac4bfb8..5c3d15e9d2d 100644 --- a/indexing-service/src/main/java/io/druid/indexing/worker/executor/ExecutorLifecycle.java +++ b/indexing-service/src/main/java/io/druid/indexing/worker/executor/ExecutorLifecycle.java @@ -123,6 +123,7 @@ public class ExecutorLifecycle jsonMapper.writerWithDefaultPrettyPrinter().writeValueAsString(taskStatus) ); + statusFile.getParentFile().mkdirs(); jsonMapper.writeValue(statusFile, taskStatus); return taskStatus; diff --git a/indexing-service/src/main/resources/indexer_static/js/console-0.0.1.js b/indexing-service/src/main/resources/indexer_static/js/console-0.0.1.js index b48dbf1dde4..e3ce86c85c9 100644 --- a/indexing-service/src/main/resources/indexer_static/js/console-0.0.1.js +++ b/indexing-service/src/main/resources/indexer_static/js/console-0.0.1.js @@ -3,22 +3,22 @@ var oTable = []; $(document).ready(function() { - $.get('/mmx/merger/v1/runningTasks', function(data) { + $.get('/druid/indexer/v1/runningTasks', function(data) { $('.running_loading').hide(); buildTable(data, $('#runningTable'), ["segments"]); }); - $.get('/mmx/merger/v1/pendingTasks', function(data) { + $.get('/druid/indexer/v1/pendingTasks', function(data) { $('.pending_loading').hide(); buildTable(data, $('#pendingTable'), ["segments"]); }); - $.get('/mmx/merger/v1/workers', function(data) { + $.get('/druid/indexer/v1/workers', function(data) { $('.workers_loading').hide(); buildTable(data, $('#workerTable')); }); - $.get('/mmx/merger/v1/scaling', function(data) { + $.get('/druid/indexer/v1/scaling', function(data) { $('.events_loading').hide(); buildTable(data, $('#eventTable')); }); diff --git a/server/src/main/java/io/druid/client/DruidServerConfig.java b/server/src/main/java/io/druid/client/DruidServerConfig.java index 14cfaee290a..089c05ff021 100644 --- a/server/src/main/java/io/druid/client/DruidServerConfig.java +++ b/server/src/main/java/io/druid/client/DruidServerConfig.java @@ -29,7 +29,7 @@ public class DruidServerConfig { @JsonProperty @Min(0) - private long maxSize = -1; + private long maxSize = 0; @JsonProperty private String tier = "_default_tier"; diff --git a/server/src/main/java/io/druid/client/indexing/IndexingServiceClient.java b/server/src/main/java/io/druid/client/indexing/IndexingServiceClient.java index abae59ee168..c345bdb2669 100644 --- a/server/src/main/java/io/druid/client/indexing/IndexingServiceClient.java +++ b/server/src/main/java/io/druid/client/indexing/IndexingServiceClient.java @@ -72,28 +72,28 @@ public class IndexingServiceClient } } - runQuery("merge", new ClientAppendQuery(dataSource, segments)); + runQuery(new ClientAppendQuery(dataSource, segments)); } public void killSegments(String dataSource, Interval interval) { - runQuery("index", new ClientKillQuery(dataSource, interval)); + runQuery(new ClientKillQuery(dataSource, interval)); } public void upgradeSegment(DataSegment dataSegment) { - runQuery("task", new ClientConversionQuery(dataSegment)); + runQuery(new ClientConversionQuery(dataSegment)); } public void upgradeSegments(String dataSource, Interval interval) { - runQuery("task", new ClientConversionQuery(dataSource, interval)); + runQuery(new ClientConversionQuery(dataSource, interval)); } - private InputStream runQuery(String endpoint, Object queryObject) + private InputStream runQuery(Object queryObject) { try { - return client.post(new URL(String.format("%s/%s", baseUrl(), endpoint))) + return client.post(new URL(String.format("%s/task", baseUrl()))) .setContent("application/json", jsonMapper.writeValueAsBytes(queryObject)) .go(RESPONSE_HANDLER) .get(); @@ -111,7 +111,7 @@ public class IndexingServiceClient throw new ISE("Cannot find instance of indexingService"); } - return String.format("http://%s:%s/druid/indexer/v1", instance.getHost(), instance.getPort()); + return String.format("http://%s/druid/indexer/v1", instance.getHost()); } catch (Exception e) { throw Throwables.propagate(e); diff --git a/server/src/main/java/io/druid/curator/discovery/DiscoveryModule.java b/server/src/main/java/io/druid/curator/discovery/DiscoveryModule.java index 3c651978129..36e13f2e144 100644 --- a/server/src/main/java/io/druid/curator/discovery/DiscoveryModule.java +++ b/server/src/main/java/io/druid/curator/discovery/DiscoveryModule.java @@ -121,7 +121,7 @@ public class DiscoveryModule implements Module public static void registerKey(Binder binder, Key key) { DruidBinders.discoveryAnnouncementBinder(binder).addBinding().toInstance(new KeyHolder<>(key)); - LifecycleModule.registerKey(binder, key); + LifecycleModule.register(binder, ServiceAnnouncer.class); } @Override @@ -134,7 +134,6 @@ public class DiscoveryModule implements Module // Build the binder so that it will at a minimum inject an empty set. DruidBinders.discoveryAnnouncementBinder(binder); - // We bind this eagerly so that it gets instantiated and registers stuff with Lifecycle as a side-effect binder.bind(ServiceAnnouncer.class) .to(Key.get(CuratorServiceAnnouncer.class, Names.named(NAME))) .in(LazySingleton.class); diff --git a/server/src/main/java/io/druid/server/QueryServlet.java b/server/src/main/java/io/druid/server/QueryServlet.java index 49d32ebe0dd..53a5b17a260 100644 --- a/server/src/main/java/io/druid/server/QueryServlet.java +++ b/server/src/main/java/io/druid/server/QueryServlet.java @@ -116,11 +116,9 @@ public class QueryServlet extends HttpServlet emitter.emit( new ServiceMetricEvent.Builder() .setUser2(query.getDataSource()) - //.setUser3(originatorType) .setUser4(query.getType()) .setUser5(query.getIntervals().get(0).toString()) .setUser6(String.valueOf(query.hasFilters())) - //.setUser8(originatorId) .setUser9(query.getDuration().toPeriod().toStandardMinutes().toString()) .build("request/time", requestTime) ); diff --git a/server/src/main/java/io/druid/server/initialization/JettyServerModule.java b/server/src/main/java/io/druid/server/initialization/JettyServerModule.java index d7e16354c22..246645dac0c 100644 --- a/server/src/main/java/io/druid/server/initialization/JettyServerModule.java +++ b/server/src/main/java/io/druid/server/initialization/JettyServerModule.java @@ -19,6 +19,8 @@ package io.druid.server.initialization; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider; import com.google.common.collect.Iterables; import com.google.common.primitives.Ints; import com.google.inject.Binder; @@ -28,6 +30,7 @@ import com.google.inject.Injector; import com.google.inject.Provides; import com.google.inject.ProvisionException; import com.google.inject.Scopes; +import com.google.inject.Singleton; import com.metamx.common.lifecycle.Lifecycle; import com.metamx.common.logger.Logger; import com.sun.jersey.api.core.DefaultResourceConfig; @@ -39,7 +42,9 @@ import io.druid.guice.Jerseys; import io.druid.guice.JsonConfigProvider; import io.druid.guice.LazySingleton; import io.druid.guice.annotations.JSR311Resource; +import io.druid.guice.annotations.Json; import io.druid.guice.annotations.Self; +import io.druid.jackson.DefaultObjectMapper; import io.druid.server.DruidNode; import io.druid.server.StatusResource; import org.eclipse.jetty.server.Connector; @@ -95,7 +100,8 @@ public class JettyServerModule extends JerseyServletModule } } - @Provides @LazySingleton + @Provides + @LazySingleton public Server getServer(Injector injector, Lifecycle lifecycle, @Self DruidNode node, ServerConfig config) { JettyServerInitializer initializer = injector.getInstance(JettyServerInitializer.class); @@ -133,6 +139,15 @@ public class JettyServerModule extends JerseyServletModule return server; } + @Provides + @Singleton + public JacksonJsonProvider getJacksonJsonProvider(@Json ObjectMapper objectMapper) + { + final JacksonJsonProvider provider = new JacksonJsonProvider(); + provider.setMapper(objectMapper); + return provider; + } + private static Server makeJettyServer(@Self DruidNode node, ServerConfig config) { final QueuedThreadPool threadPool = new QueuedThreadPool(); diff --git a/server/src/main/java/io/druid/server/master/DruidMaster.java b/server/src/main/java/io/druid/server/master/DruidMaster.java index 937c1c6bfa1..0c1a5d5419e 100644 --- a/server/src/main/java/io/druid/server/master/DruidMaster.java +++ b/server/src/main/java/io/druid/server/master/DruidMaster.java @@ -161,7 +161,7 @@ public class DruidMaster this.exec = scheduledExecutorFactory.create(1, "Master-Exec--%d"); this.leaderLatch = new AtomicReference<>(null); - this.segmentSettingsAtomicReference= new AtomicReference<>(null); + this.segmentSettingsAtomicReference = new AtomicReference<>(null); this.loadManagementPeons = loadQueuePeonMap; } @@ -471,10 +471,13 @@ public class DruidMaster serverInventoryView.start(); final List> masterRunnables = Lists.newArrayList(); - segmentSettingsAtomicReference = configManager.watch(MasterSegmentSettings.CONFIG_KEY, MasterSegmentSettings.class,new MasterSegmentSettings.Builder().build()); + segmentSettingsAtomicReference = configManager.watch( + MasterSegmentSettings.CONFIG_KEY, + MasterSegmentSettings.class, + new MasterSegmentSettings.Builder().build() + ); masterRunnables.add(Pair.of(new MasterComputeManagerRunnable(), config.getMasterPeriod())); if (indexingServiceClient != null) { - masterRunnables.add( Pair.of( new MasterIndexingServiceRunnable( diff --git a/server/src/main/java/io/druid/server/master/DruidMasterConfig.java b/server/src/main/java/io/druid/server/master/DruidMasterConfig.java index d279b91d5c1..49594850a91 100644 --- a/server/src/main/java/io/druid/server/master/DruidMasterConfig.java +++ b/server/src/main/java/io/druid/server/master/DruidMasterConfig.java @@ -42,7 +42,7 @@ public abstract class DruidMasterConfig @Default("PT1800s") public abstract Duration getMasterSegmentMergerPeriod(); - @Config("druid.master.merger.on") + @Config("druid.master.merge.on") public boolean isMergeSegments() { return false; diff --git a/server/src/main/java/io/druid/server/master/MasterSegmentSettings.java b/server/src/main/java/io/druid/server/master/MasterSegmentSettings.java index a31f9b8dce8..68c733e3c0a 100644 --- a/server/src/main/java/io/druid/server/master/MasterSegmentSettings.java +++ b/server/src/main/java/io/druid/server/master/MasterSegmentSettings.java @@ -24,8 +24,8 @@ import com.fasterxml.jackson.annotation.JsonProperty; public class MasterSegmentSettings { public static final String CONFIG_KEY = "master.dynamicConfigs"; - private long millisToWaitBeforeDeleting=15 * 60 * 1000L; - private long mergeBytesLimit= 100000000L; + private long millisToWaitBeforeDeleting = 15 * 60 * 1000L; + private long mergeBytesLimit = 100000000L; private int mergeSegmentsLimit = Integer.MAX_VALUE; private int maxSegmentsToMove = 5; private boolean emitBalancingStats = false; @@ -39,11 +39,11 @@ public class MasterSegmentSettings @JsonProperty("emitBalancingStats") Boolean emitBalancingStats ) { - this.maxSegmentsToMove=maxSegmentsToMove; - this.millisToWaitBeforeDeleting=millisToWaitBeforeDeleting; - this.mergeSegmentsLimit=mergeSegmentsLimit; - this.mergeBytesLimit=mergeBytesLimit; - this.emitBalancingStats = emitBalancingStats; + this.maxSegmentsToMove = maxSegmentsToMove; + this.millisToWaitBeforeDeleting = millisToWaitBeforeDeleting; + this.mergeSegmentsLimit = mergeSegmentsLimit; + this.mergeBytesLimit = mergeBytesLimit; + this.emitBalancingStats = emitBalancingStats; } public static String getConfigKey() @@ -81,7 +81,6 @@ public class MasterSegmentSettings return maxSegmentsToMove; } - public static class Builder { public static final String CONFIG_KEY = "master.dynamicConfigs"; @@ -93,14 +92,16 @@ public class MasterSegmentSettings public Builder() { - this.millisToWaitBeforeDeleting=15 * 60 * 1000L; - this.mergeBytesLimit= 100000000L; - this.mergeSegmentsLimit= Integer.MAX_VALUE; - this.maxSegmentsToMove = 5; - this.emitBalancingStats = false; + this(15 * 60 * 1000L, 100000000L, Integer.MAX_VALUE, 5, false); } - public Builder(long millisToWaitBeforeDeleting, long mergeBytesLimit, int mergeSegmentsLimit, int maxSegmentsToMove, boolean emitBalancingStats) + public Builder( + long millisToWaitBeforeDeleting, + long mergeBytesLimit, + int mergeSegmentsLimit, + int maxSegmentsToMove, + boolean emitBalancingStats + ) { this.millisToWaitBeforeDeleting = millisToWaitBeforeDeleting; this.mergeBytesLimit = mergeBytesLimit; @@ -111,31 +112,37 @@ public class MasterSegmentSettings public Builder withMillisToWaitBeforeDeleting(long millisToWaitBeforeDeleting) { - this.millisToWaitBeforeDeleting=millisToWaitBeforeDeleting; + this.millisToWaitBeforeDeleting = millisToWaitBeforeDeleting; return this; } public Builder withMergeBytesLimit(long mergeBytesLimit) { - this.mergeBytesLimit=mergeBytesLimit; + this.mergeBytesLimit = mergeBytesLimit; return this; } public Builder withMergeSegmentsLimit(int mergeSegmentsLimit) { - this.mergeSegmentsLimit=mergeSegmentsLimit; + this.mergeSegmentsLimit = mergeSegmentsLimit; return this; } public Builder withMaxSegmentsToMove(int maxSegmentsToMove) { - this.maxSegmentsToMove=maxSegmentsToMove; + this.maxSegmentsToMove = maxSegmentsToMove; return this; } public MasterSegmentSettings build() { - return new MasterSegmentSettings(millisToWaitBeforeDeleting,mergeBytesLimit,mergeSegmentsLimit,maxSegmentsToMove, emitBalancingStats); + return new MasterSegmentSettings( + millisToWaitBeforeDeleting, + mergeBytesLimit, + mergeSegmentsLimit, + maxSegmentsToMove, + emitBalancingStats + ); } } } diff --git a/services/src/main/java/io/druid/cli/CliHadoopIndexer.java b/services/src/main/java/io/druid/cli/CliHadoopIndexer.java index 8024907bbee..ec5db65ec97 100644 --- a/services/src/main/java/io/druid/cli/CliHadoopIndexer.java +++ b/services/src/main/java/io/druid/cli/CliHadoopIndexer.java @@ -52,7 +52,7 @@ import java.util.List; ) public class CliHadoopIndexer extends GuiceRunnable { - @Arguments(description = "A JSON object or the path to a file that contains a JSON object") + @Arguments(description = "A JSON object or the path to a file that contains a JSON object", required = true) private String argumentSpec; private static final Logger log = new Logger(CliHadoopIndexer.class); @@ -78,8 +78,6 @@ public class CliHadoopIndexer extends GuiceRunnable @LazySingleton public HadoopDruidIndexerConfig getHadoopDruidIndexerConfig() { - Preconditions.checkNotNull(argumentSpec, "argumentSpec"); - try { if (argumentSpec.startsWith("{")) { return HadoopDruidIndexerConfig.fromString(argumentSpec); diff --git a/services/src/main/java/io/druid/cli/CliOverlord.java b/services/src/main/java/io/druid/cli/CliOverlord.java index a4d34bb6b0c..446283e0dee 100644 --- a/services/src/main/java/io/druid/cli/CliOverlord.java +++ b/services/src/main/java/io/druid/cli/CliOverlord.java @@ -57,6 +57,7 @@ import io.druid.indexing.coordinator.TaskRunnerFactory; import io.druid.indexing.coordinator.TaskStorage; import io.druid.indexing.coordinator.TaskStorageQueryAdapter; import io.druid.indexing.coordinator.http.IndexerCoordinatorResource; +import io.druid.indexing.coordinator.http.OldIndexerCoordinatorResource; import io.druid.indexing.coordinator.http.OverlordRedirectInfo; import io.druid.indexing.coordinator.scaling.AutoScalingStrategy; import io.druid.indexing.coordinator.scaling.EC2AutoScalingStrategy; @@ -139,6 +140,7 @@ public class CliOverlord extends ServerRunnable binder.bind(JettyServerInitializer.class).toInstance(new OverlordJettyServerInitializer()); Jerseys.addResource(binder, IndexerCoordinatorResource.class); + Jerseys.addResource(binder, OldIndexerCoordinatorResource.class); LifecycleModule.register(binder, Server.class); } diff --git a/services/src/main/java/io/druid/cli/CliPeon.java b/services/src/main/java/io/druid/cli/CliPeon.java index 1d5e1035232..f130f190950 100644 --- a/services/src/main/java/io/druid/cli/CliPeon.java +++ b/services/src/main/java/io/druid/cli/CliPeon.java @@ -22,7 +22,6 @@ package io.druid.cli; import com.google.common.base.Throwables; import com.google.common.collect.ImmutableList; import com.google.inject.Binder; -import com.google.inject.Inject; import com.google.inject.Injector; import com.google.inject.Key; import com.google.inject.Module; @@ -35,6 +34,7 @@ import io.airlift.command.Option; import io.druid.guice.Jerseys; import io.druid.guice.JsonConfigProvider; import io.druid.guice.LazySingleton; +import io.druid.guice.LifecycleModule; import io.druid.guice.ManageLifecycle; import io.druid.guice.NodeTypeConfig; import io.druid.guice.PolyBind; @@ -52,13 +52,13 @@ import io.druid.indexing.coordinator.ThreadPoolTaskRunner; import io.druid.indexing.worker.executor.ChatHandlerResource; import io.druid.indexing.worker.executor.ExecutorLifecycle; import io.druid.indexing.worker.executor.ExecutorLifecycleConfig; -import io.druid.initialization.LogLevelAdjuster; import io.druid.query.QuerySegmentWalker; import io.druid.segment.loading.DataSegmentKiller; import io.druid.segment.loading.S3DataSegmentKiller; import io.druid.segment.loading.SegmentLoaderConfig; import io.druid.segment.loading.StorageLocationConfig; import io.druid.server.initialization.JettyServerInitializer; +import org.eclipse.jetty.server.Server; import java.io.File; import java.util.Arrays; @@ -71,7 +71,7 @@ import java.util.List; description = "Runs a Peon, this is an individual forked \"task\" used as part of the indexing service. " + "This should rarely, if ever, be used directly." ) -public class CliPeon implements Runnable +public class CliPeon extends GuiceRunnable { @Arguments(description = "task.json status.json", required = true) public List taskAndStatusFile; @@ -79,74 +79,71 @@ public class CliPeon implements Runnable @Option(name = "--nodeType", title = "nodeType", description = "Set the node type to expose on ZK") public String nodeType = "indexer-executor"; - private Injector injector; - - @Inject - public void configure(Injector injector) - { - this.injector = injector; - } - private static final Logger log = new Logger(CliPeon.class); - protected Injector getInjector() + public CliPeon() { - return Initialization.makeInjectorWithModules( - injector, - ImmutableList.of( - new Module() - { - @Override - public void configure(Binder binder) - { - PolyBind.createChoice( - binder, - "druid.indexer.task.chathandler.type", - Key.get(ChatHandlerProvider.class), - Key.get(NoopChatHandlerProvider.class) - ); - final MapBinder handlerProviderBinder = PolyBind.optionBinder( - binder, Key.get(ChatHandlerProvider.class) - ); - handlerProviderBinder.addBinding("curator").to(EventReceivingChatHandlerProvider.class); - handlerProviderBinder.addBinding("noop").to(NoopChatHandlerProvider.class); + super(log); + } - binder.bind(TaskToolboxFactory.class).in(LazySingleton.class); + @Override + protected List getModules() + { + return ImmutableList.of( + new Module() + { + @Override + public void configure(Binder binder) + { + PolyBind.createChoice( + binder, + "druid.indexer.task.chathandler.type", + Key.get(ChatHandlerProvider.class), + Key.get(NoopChatHandlerProvider.class) + ); + final MapBinder handlerProviderBinder = PolyBind.optionBinder( + binder, Key.get(ChatHandlerProvider.class) + ); + handlerProviderBinder.addBinding("curator").to(EventReceivingChatHandlerProvider.class); + handlerProviderBinder.addBinding("noop").to(NoopChatHandlerProvider.class); - JsonConfigProvider.bind(binder, "druid.indexer.task", TaskConfig.class); - JsonConfigProvider.bind(binder, "druid.worker.taskActionClient.retry", RetryPolicyConfig.class); + binder.bind(TaskToolboxFactory.class).in(LazySingleton.class); - binder.bind(TaskActionClientFactory.class) - .to(RemoteTaskActionClientFactory.class) - .in(LazySingleton.class); - binder.bind(RetryPolicyFactory.class).in(LazySingleton.class); + JsonConfigProvider.bind(binder, "druid.indexer.task", TaskConfig.class); + JsonConfigProvider.bind(binder, "druid.worker.taskActionClient.retry", RetryPolicyConfig.class); - binder.bind(DataSegmentKiller.class).to(S3DataSegmentKiller.class).in(LazySingleton.class); + binder.bind(TaskActionClientFactory.class) + .to(RemoteTaskActionClientFactory.class) + .in(LazySingleton.class); + binder.bind(RetryPolicyFactory.class).in(LazySingleton.class); - binder.bind(ExecutorLifecycle.class).in(ManageLifecycle.class); - binder.bind(ExecutorLifecycleConfig.class).toInstance( - new ExecutorLifecycleConfig() - .setTaskFile(new File(taskAndStatusFile.get(0))) - .setStatusFile(new File(taskAndStatusFile.get(1))) - ); + binder.bind(DataSegmentKiller.class).to(S3DataSegmentKiller.class).in(LazySingleton.class); - binder.bind(TaskRunner.class).to(ThreadPoolTaskRunner.class); - binder.bind(QuerySegmentWalker.class).to(ThreadPoolTaskRunner.class); - binder.bind(ThreadPoolTaskRunner.class).in(ManageLifecycle.class); + binder.bind(ExecutorLifecycle.class).in(ManageLifecycle.class); + binder.bind(ExecutorLifecycleConfig.class).toInstance( + new ExecutorLifecycleConfig() + .setTaskFile(new File(taskAndStatusFile.get(0))) + .setStatusFile(new File(taskAndStatusFile.get(1))) + ); - // Override the default SegmentLoaderConfig because we don't actually care about the - // configuration based locations. This will override them anyway. This is also stopping - // configuration of other parameters, but I don't think that's actually a problem. - // Note, if that is actually not a problem, then that probably means we have the wrong abstraction. - binder.bind(SegmentLoaderConfig.class) - .toInstance(new SegmentLoaderConfig().withLocations(Arrays.asList())); + binder.bind(TaskRunner.class).to(ThreadPoolTaskRunner.class); + binder.bind(QuerySegmentWalker.class).to(ThreadPoolTaskRunner.class); + binder.bind(ThreadPoolTaskRunner.class).in(ManageLifecycle.class); - binder.bind(JettyServerInitializer.class).to(QueryJettyServerInitializer.class); - Jerseys.addResource(binder, ChatHandlerResource.class); - binder.bind(NodeTypeConfig.class).toInstance(new NodeTypeConfig(nodeType)); - } - } - ) + // Override the default SegmentLoaderConfig because we don't actually care about the + // configuration based locations. This will override them anyway. This is also stopping + // configuration of other parameters, but I don't think that's actually a problem. + // Note, if that is actually not a problem, then that probably means we have the wrong abstraction. + binder.bind(SegmentLoaderConfig.class) + .toInstance(new SegmentLoaderConfig().withLocations(Arrays.asList())); + + binder.bind(JettyServerInitializer.class).to(QueryJettyServerInitializer.class); + Jerseys.addResource(binder, ChatHandlerResource.class); + binder.bind(NodeTypeConfig.class).toInstance(new NodeTypeConfig(nodeType)); + + LifecycleModule.register(binder, Server.class); + } + } ); } @@ -154,13 +151,10 @@ public class CliPeon implements Runnable public void run() { try { - LogLevelAdjuster.register(); - - final Injector injector = getInjector(); - final Lifecycle lifecycle = injector.getInstance(Lifecycle.class); + Injector injector = makeInjector(); + Lifecycle lifecycle = initLifecycle(injector); try { - lifecycle.start(); injector.getInstance(ExecutorLifecycle.class).join(); lifecycle.stop(); } diff --git a/services/src/main/java/io/druid/cli/convert/ConvertProperties.java b/services/src/main/java/io/druid/cli/convert/ConvertProperties.java index a5d5944554d..7c97b4a8012 100644 --- a/services/src/main/java/io/druid/cli/convert/ConvertProperties.java +++ b/services/src/main/java/io/druid/cli/convert/ConvertProperties.java @@ -91,7 +91,7 @@ public class ConvertProperties implements Runnable new Rename("druid.indexer.maxPendingTaskDuration", "druid.indexer.autoscale.pendingTaskTimeout"), new Rename("druid.indexer.worker.version", "druid.indexer.autoscale.workerVersion"), new Rename("druid.indexer.worker.port", "druid.indexer.autoscale.workerPort"), - new Rename("druid.worker.masterService", "druid.worker.overlordService"), + new Rename("druid.worker.masterService", "druid.selectors.indexing.serviceName"), new ChatHandlerConverter(), new Rename("druid.indexer.baseDir", "druid.indexer.task.baseDir"), new Rename("druid.indexer.taskDir", "druid.indexer.task.taskDir"), @@ -100,6 +100,7 @@ public class ConvertProperties implements Runnable new Rename("druid.worker.taskActionClient.retry.minWaitMillis", "druid.worker.taskActionClient.retry.minWait"), new Rename("druid.worker.taskActionClient.retry.maxWaitMillis", "druid.worker.taskActionClient.retry.maxWait"), new Rename("druid.master.merger.service", "druid.selectors.indexing.serviceName"), + new Rename("druid.master.merger.on", "druid.master.merge.on"), new DataSegmentPusherDefaultConverter(), new Rename("druid.pusher.hdfs.storageDirectory", "druid.pusher.storageDirectory"), new Rename("druid.pusher.cassandra.host", "druid.pusher.host"), From 0b04325ee89227312f57966cd1cf2d010e671a77 Mon Sep 17 00:00:00 2001 From: fjy Date: Fri, 27 Sep 2013 10:17:45 -0700 Subject: [PATCH 10/10] fix things up according to code review comments --- .../main/java/io/druid/indexing/common/TaskToolboxFactory.java | 2 +- services/src/main/java/io/druid/cli/CliPeon.java | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/indexing-service/src/main/java/io/druid/indexing/common/TaskToolboxFactory.java b/indexing-service/src/main/java/io/druid/indexing/common/TaskToolboxFactory.java index 33e2a367c41..ca00dccaf91 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/TaskToolboxFactory.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/TaskToolboxFactory.java @@ -68,7 +68,7 @@ public class TaskToolboxFactory @Processing ExecutorService queryExecutorService, MonitorScheduler monitorScheduler, SegmentLoaderFactory segmentLoaderFactory, - @Json ObjectMapper objectMapper + ObjectMapper objectMapper ) { this.config = config; diff --git a/services/src/main/java/io/druid/cli/CliPeon.java b/services/src/main/java/io/druid/cli/CliPeon.java index f130f190950..b4b9ce4e661 100644 --- a/services/src/main/java/io/druid/cli/CliPeon.java +++ b/services/src/main/java/io/druid/cli/CliPeon.java @@ -152,9 +152,10 @@ public class CliPeon extends GuiceRunnable { try { Injector injector = makeInjector(); - Lifecycle lifecycle = initLifecycle(injector); try { + Lifecycle lifecycle = initLifecycle(injector); + injector.getInstance(ExecutorLifecycle.class).join(); lifecycle.stop(); }