diff --git a/indexing-service/src/main/java/io/druid/indexing/common/actions/RemoteTaskActionClient.java b/indexing-service/src/main/java/io/druid/indexing/common/actions/RemoteTaskActionClient.java index b2613e13064..41639aa9989 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/actions/RemoteTaskActionClient.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/actions/RemoteTaskActionClient.java @@ -27,8 +27,8 @@ import com.metamx.common.ISE; import com.metamx.common.logger.Logger; import com.metamx.http.client.HttpClient; import com.metamx.http.client.response.ToStringResponseHandler; -import io.druid.client.indexing.IndexingServiceSelector; import io.druid.client.selector.Server; +import io.druid.curator.discovery.ServerDiscoverySelector; import io.druid.indexing.common.RetryPolicy; import io.druid.indexing.common.RetryPolicyFactory; import io.druid.indexing.common.task.Task; @@ -42,7 +42,7 @@ public class RemoteTaskActionClient implements TaskActionClient { private final Task task; private final HttpClient httpClient; - private final IndexingServiceSelector serviceProvider; + private final ServerDiscoverySelector selector; private final RetryPolicyFactory retryPolicyFactory; private final ObjectMapper jsonMapper; @@ -51,14 +51,14 @@ public class RemoteTaskActionClient implements TaskActionClient public RemoteTaskActionClient( Task task, HttpClient httpClient, - IndexingServiceSelector serviceProvider, + ServerDiscoverySelector selector, RetryPolicyFactory retryPolicyFactory, ObjectMapper jsonMapper ) { this.task = task; this.httpClient = httpClient; - this.serviceProvider = serviceProvider; + this.selector = selector; this.retryPolicyFactory = retryPolicyFactory; this.jsonMapper = jsonMapper; } @@ -127,7 +127,7 @@ public class RemoteTaskActionClient implements TaskActionClient private URI getServiceUri() throws Exception { - final Server instance = serviceProvider.pick(); + final Server instance = selector.pick(); if (instance == null) { throw new ISE("Cannot find instance of indexer to talk to!"); } diff --git a/indexing-service/src/main/java/io/druid/indexing/common/actions/RemoteTaskActionClientFactory.java b/indexing-service/src/main/java/io/druid/indexing/common/actions/RemoteTaskActionClientFactory.java index 4ae53a595b9..033c1263ca3 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/actions/RemoteTaskActionClientFactory.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/actions/RemoteTaskActionClientFactory.java @@ -22,7 +22,8 @@ package io.druid.indexing.common.actions; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.inject.Inject; import com.metamx.http.client.HttpClient; -import io.druid.client.indexing.IndexingServiceSelector; +import io.druid.client.indexing.IndexingService; +import io.druid.curator.discovery.ServerDiscoverySelector; import io.druid.guice.annotations.Global; import io.druid.indexing.common.RetryPolicyFactory; import io.druid.indexing.common.task.Task; @@ -32,20 +33,20 @@ import io.druid.indexing.common.task.Task; public class RemoteTaskActionClientFactory implements TaskActionClientFactory { private final HttpClient httpClient; - private final IndexingServiceSelector serviceProvider; + private final ServerDiscoverySelector selector; private final RetryPolicyFactory retryPolicyFactory; private final ObjectMapper jsonMapper; @Inject public RemoteTaskActionClientFactory( @Global HttpClient httpClient, - IndexingServiceSelector serviceProvider, + @IndexingService ServerDiscoverySelector selector, RetryPolicyFactory retryPolicyFactory, ObjectMapper jsonMapper ) { this.httpClient = httpClient; - this.serviceProvider = serviceProvider; + this.selector = selector; this.retryPolicyFactory = retryPolicyFactory; this.jsonMapper = jsonMapper; } @@ -53,6 +54,6 @@ public class RemoteTaskActionClientFactory implements TaskActionClientFactory @Override public TaskActionClient create(Task task) { - return new RemoteTaskActionClient(task, httpClient, serviceProvider, retryPolicyFactory, jsonMapper); + return new RemoteTaskActionClient(task, httpClient, selector, retryPolicyFactory, jsonMapper); } } diff --git a/server/src/main/java/io/druid/client/indexing/IndexingServiceClient.java b/server/src/main/java/io/druid/client/indexing/IndexingServiceClient.java index 628ed978c42..abae59ee168 100644 --- a/server/src/main/java/io/druid/client/indexing/IndexingServiceClient.java +++ b/server/src/main/java/io/druid/client/indexing/IndexingServiceClient.java @@ -26,8 +26,8 @@ import com.metamx.common.IAE; import com.metamx.common.ISE; import com.metamx.http.client.HttpClient; import com.metamx.http.client.response.InputStreamResponseHandler; -import io.druid.client.selector.DiscoverySelector; import io.druid.client.selector.Server; +import io.druid.curator.discovery.ServerDiscoverySelector; import io.druid.guice.annotations.Global; import io.druid.timeline.DataSegment; import org.joda.time.Interval; @@ -43,18 +43,18 @@ public class IndexingServiceClient private final HttpClient client; private final ObjectMapper jsonMapper; - private final DiscoverySelector serviceProvider; + private final ServerDiscoverySelector selector; @Inject public IndexingServiceClient( @Global HttpClient client, ObjectMapper jsonMapper, - @IndexingService DiscoverySelector serviceProvider + @IndexingService ServerDiscoverySelector selector ) { this.client = client; this.jsonMapper = jsonMapper; - this.serviceProvider = serviceProvider; + this.selector = selector; } public void mergeSegments(List segments) @@ -106,7 +106,7 @@ public class IndexingServiceClient private String baseUrl() { try { - final Server instance = serviceProvider.pick(); + final Server instance = selector.pick(); if (instance == null) { throw new ISE("Cannot find instance of indexingService"); } diff --git a/server/src/main/java/io/druid/curator/discovery/DiscoveryModule.java b/server/src/main/java/io/druid/curator/discovery/DiscoveryModule.java index 3f577f2d64f..a632392fbac 100644 --- a/server/src/main/java/io/druid/curator/discovery/DiscoveryModule.java +++ b/server/src/main/java/io/druid/curator/discovery/DiscoveryModule.java @@ -59,7 +59,7 @@ import java.util.concurrent.ThreadFactory; /** * The DiscoveryModule allows for the registration of Keys of DruidNode objects, which it intends to be * automatically announced at the end of the lifecycle start. - * + *

* In order for this to work a ServiceAnnouncer instance *must* be injected and instantiated first. * This can often be achieved by registering ServiceAnnouncer.class with the LifecycleModule. */ @@ -69,7 +69,7 @@ public class DiscoveryModule implements Module /** * Requests that the un-annotated DruidNode instance be injected and published as part of the lifecycle. - * + *

* That is, this module will announce the DruidNode instance returned by * injector.getInstance(Key.get(DruidNode.class)) automatically. * Announcement will happen in the LAST stage of the Lifecycle @@ -81,7 +81,7 @@ public class DiscoveryModule implements Module /** * Requests that the annotated DruidNode instance be injected and published as part of the lifecycle. - * + *

* That is, this module will announce the DruidNode instance returned by * injector.getInstance(Key.get(DruidNode.class, annotation)) automatically. * Announcement will happen in the LAST stage of the Lifecycle @@ -95,7 +95,7 @@ public class DiscoveryModule implements Module /** * Requests that the annotated DruidNode instance be injected and published as part of the lifecycle. - * + *

* That is, this module will announce the DruidNode instance returned by * injector.getInstance(Key.get(DruidNode.class, annotation)) automatically. * Announcement will happen in the LAST stage of the Lifecycle @@ -109,7 +109,7 @@ public class DiscoveryModule implements Module /** * Requests that the keyed DruidNode instance be injected and published as part of the lifecycle. - * + *

* That is, this module will announce the DruidNode instance returned by * injector.getInstance(Key.get(DruidNode.class, annotation)) automatically. * Announcement will happen in the LAST stage of the Lifecycle @@ -137,7 +137,9 @@ public class DiscoveryModule implements Module .asEagerSingleton(); } - @Provides @LazySingleton @Named(NAME) + @Provides + @LazySingleton + @Named(NAME) public CuratorServiceAnnouncer getServiceAnnouncer( final CuratorServiceAnnouncer announcer, final Injector injector, @@ -181,7 +183,8 @@ public class DiscoveryModule implements Module return announcer; } - @Provides @LazySingleton + @Provides + @LazySingleton public ServiceDiscovery getServiceDiscovery( CuratorFramework curator, CuratorDiscoveryConfig config, @@ -217,13 +220,21 @@ public class DiscoveryModule implements Module throw Throwables.propagate(e); } } - }, - Lifecycle.Stage.LAST + } ); return serviceDiscovery; } + @Provides + @LazySingleton + public ServerDiscoveryFactory getServerDiscoveryFactory( + ServiceDiscovery serviceDiscovery + ) + { + return new ServerDiscoveryFactory(serviceDiscovery); + } + private static class NoopServiceDiscovery implements ServiceDiscovery { @Override diff --git a/server/src/main/java/io/druid/curator/discovery/ServerDiscoveryFactory.java b/server/src/main/java/io/druid/curator/discovery/ServerDiscoveryFactory.java new file mode 100644 index 00000000000..c436289e70e --- /dev/null +++ b/server/src/main/java/io/druid/curator/discovery/ServerDiscoveryFactory.java @@ -0,0 +1,72 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012, 2013 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package io.druid.curator.discovery; + +import com.google.inject.Inject; +import org.apache.curator.x.discovery.ServiceDiscovery; +import org.apache.curator.x.discovery.ServiceInstance; +import org.apache.curator.x.discovery.ServiceProvider; + +import java.io.IOException; + +/** + */ +public class ServerDiscoveryFactory +{ + private final ServiceDiscovery serviceDiscovery; + + @Inject + public ServerDiscoveryFactory(ServiceDiscovery serviceDiscovery) + { + this.serviceDiscovery = serviceDiscovery; + } + + public ServerDiscoverySelector createSelector(String serviceName) + { + if (serviceName == null) { + return new ServerDiscoverySelector(new NoopServiceProvider()); + } + + final ServiceProvider serviceProvider = serviceDiscovery.serviceProviderBuilder().serviceName(serviceName).build(); + return new ServerDiscoverySelector(serviceProvider); + } + + private static class NoopServiceProvider implements ServiceProvider + { + @Override + public void start() throws Exception + { + // do nothing + } + + @Override + public ServiceInstance getInstance() throws Exception + { + return null; + } + + @Override + public void close() throws IOException + { + // do nothing + } + } + +} diff --git a/server/src/main/java/io/druid/client/indexing/IndexingServiceSelector.java b/server/src/main/java/io/druid/curator/discovery/ServerDiscoverySelector.java similarity index 84% rename from server/src/main/java/io/druid/client/indexing/IndexingServiceSelector.java rename to server/src/main/java/io/druid/curator/discovery/ServerDiscoverySelector.java index 7c5eaaf8d72..b7cd72abf17 100644 --- a/server/src/main/java/io/druid/client/indexing/IndexingServiceSelector.java +++ b/server/src/main/java/io/druid/curator/discovery/ServerDiscoverySelector.java @@ -17,9 +17,8 @@ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ -package io.druid.client.indexing; +package io.druid.curator.discovery; -import com.google.inject.Inject; import com.metamx.common.lifecycle.LifecycleStart; import com.metamx.common.lifecycle.LifecycleStop; import com.metamx.common.logger.Logger; @@ -28,21 +27,18 @@ import io.druid.client.selector.Server; import org.apache.curator.x.discovery.ServiceInstance; import org.apache.curator.x.discovery.ServiceProvider; -import javax.annotation.Nullable; import java.io.IOException; /** -*/ -public class IndexingServiceSelector implements DiscoverySelector + */ +public class ServerDiscoverySelector implements DiscoverySelector { - private static final Logger log = new Logger(IndexingServiceSelector.class); + private static final Logger log = new Logger(ServerDiscoverySelector.class); private final ServiceProvider serviceProvider; - @Inject - public IndexingServiceSelector( - @Nullable @IndexingService ServiceProvider serviceProvider - ) { + public ServerDiscoverySelector(ServiceProvider serviceProvider) + { this.serviceProvider = serviceProvider; } @@ -54,7 +50,12 @@ public class IndexingServiceSelector implements DiscoverySelector instance = serviceProvider.getInstance(); } catch (Exception e) { - log.info(e, ""); + log.info(e, "Exception getting instance"); + return null; + } + + if (instance == null) { + log.error("No server instance found"); return null; } diff --git a/server/src/main/java/io/druid/guice/IndexingServiceDiscoveryModule.java b/server/src/main/java/io/druid/guice/IndexingServiceDiscoveryModule.java index 60e0c15281b..10535bb9d87 100644 --- a/server/src/main/java/io/druid/guice/IndexingServiceDiscoveryModule.java +++ b/server/src/main/java/io/druid/guice/IndexingServiceDiscoveryModule.java @@ -22,17 +22,10 @@ package io.druid.guice; import com.google.inject.Binder; import com.google.inject.Module; import com.google.inject.Provides; -import com.google.inject.TypeLiteral; import io.druid.client.indexing.IndexingService; -import io.druid.client.indexing.IndexingServiceSelector; import io.druid.client.indexing.IndexingServiceSelectorConfig; -import io.druid.client.selector.DiscoverySelector; -import io.druid.client.selector.Server; -import org.apache.curator.x.discovery.ServiceDiscovery; -import org.apache.curator.x.discovery.ServiceInstance; -import org.apache.curator.x.discovery.ServiceProvider; - -import java.io.IOException; +import io.druid.curator.discovery.ServerDiscoveryFactory; +import io.druid.curator.discovery.ServerDiscoverySelector; /** */ @@ -42,42 +35,16 @@ public class IndexingServiceDiscoveryModule implements Module public void configure(Binder binder) { JsonConfigProvider.bind(binder, "druid.selectors.indexing", IndexingServiceSelectorConfig.class); - binder.bind(new TypeLiteral>(){}) - .annotatedWith(IndexingService.class) - .to(IndexingServiceSelector.class); - - binder.bind(IndexingServiceSelector.class).in(ManageLifecycle.class); } @Provides - @LazySingleton @IndexingService - public ServiceProvider getServiceProvider( + @IndexingService + @ManageLifecycle + public ServerDiscoverySelector getServiceProvider( IndexingServiceSelectorConfig config, - ServiceDiscovery serviceDiscovery + ServerDiscoveryFactory serverDiscoveryFactory ) { - if (config.getServiceName() == null) { - return new ServiceProvider() - { - @Override - public void start() throws Exception - { - - } - - @Override - public ServiceInstance getInstance() throws Exception - { - return null; - } - - @Override - public void close() throws IOException - { - - } - }; - } - return serviceDiscovery.serviceProviderBuilder().serviceName(config.getServiceName()).build(); + return serverDiscoveryFactory.createSelector(config.getServiceName()); } } diff --git a/server/src/main/java/io/druid/server/initialization/ExtensionsConfig.java b/server/src/main/java/io/druid/server/initialization/ExtensionsConfig.java index 0973a249434..46403358758 100644 --- a/server/src/main/java/io/druid/server/initialization/ExtensionsConfig.java +++ b/server/src/main/java/io/druid/server/initialization/ExtensionsConfig.java @@ -29,6 +29,10 @@ import java.util.List; */ public class ExtensionsConfig { + @JsonProperty + @NotNull + private boolean searchCurrentClassloader = true; + @JsonProperty @NotNull private List coordinates = ImmutableList.of(); @@ -44,6 +48,11 @@ public class ExtensionsConfig "https://metamx.artifactoryonline.com/metamx/pub-libs-releases-local" ); + public boolean searchCurrentClassloader() + { + return searchCurrentClassloader; + } + public List getCoordinates() { return coordinates; diff --git a/services/src/main/java/io/druid/cli/Initialization.java b/services/src/main/java/io/druid/cli/Initialization.java index 8b1f1fa7640..76f502b1563 100644 --- a/services/src/main/java/io/druid/cli/Initialization.java +++ b/services/src/main/java/io/druid/cli/Initialization.java @@ -97,6 +97,13 @@ public class final TeslaAether aether = getAetherClient(config); List retVal = Lists.newArrayList(); + if (config.searchCurrentClassloader()) { + for (T module : ServiceLoader.load(clazz, Initialization.class.getClassLoader())) { + log.info("Adding local module[%s]", module.getClass()); + retVal.add(module); + } + } + for (String coordinate : config.getCoordinates()) { log.info("Loading extension[%s]", coordinate); try { @@ -139,14 +146,13 @@ public class for (Artifact artifact : artifacts) { if (!exclusions.contains(artifact.getGroupId())) { urls.add(artifact.getFile().toURI().toURL()); - } - else { + } else { log.error("Skipped Artifact[%s]", artifact); } } for (URL url : urls) { - log.error("Added URL[%s]", url); + log.info("Added URL[%s]", url); } loader = new URLClassLoader(urls.toArray(new URL[urls.size()]), Initialization.class.getClassLoader()); @@ -243,7 +249,8 @@ public class private final ObjectMapper smileMapper; private final List modules; - public ModuleList(Injector baseInjector) { + public ModuleList(Injector baseInjector) + { this.baseInjector = baseInjector; this.jsonMapper = baseInjector.getInstance(Key.get(ObjectMapper.class, Json.class)); this.smileMapper = baseInjector.getInstance(Key.get(ObjectMapper.class, Smile.class)); @@ -260,24 +267,19 @@ public class if (input instanceof DruidModule) { baseInjector.injectMembers(input); modules.add(registerJacksonModules(((DruidModule) input))); - } - else if (input instanceof Module) { + } else if (input instanceof Module) { baseInjector.injectMembers(input); modules.add((Module) input); - } - else if (input instanceof Class) { + } else if (input instanceof Class) { if (DruidModule.class.isAssignableFrom((Class) input)) { modules.add(registerJacksonModules(baseInjector.getInstance((Class) input))); - } - else if (Module.class.isAssignableFrom((Class) input)) { + } else if (Module.class.isAssignableFrom((Class) input)) { modules.add(baseInjector.getInstance((Class) input)); return; - } - else { + } else { throw new ISE("Class[%s] does not implement %s", input.getClass(), Module.class); } - } - else { + } else { throw new ISE("Unknown module type[%s]", input.getClass()); } }