Merge pull request #244 from metamx/guice-discovery

Refactor how server service discovery is done
This commit is contained in:
cheddar 2013-09-24 14:58:34 -07:00
commit 9b57b4e1f3
9 changed files with 152 additions and 89 deletions

View File

@ -27,8 +27,8 @@ import com.metamx.common.ISE;
import com.metamx.common.logger.Logger; import com.metamx.common.logger.Logger;
import com.metamx.http.client.HttpClient; import com.metamx.http.client.HttpClient;
import com.metamx.http.client.response.ToStringResponseHandler; import com.metamx.http.client.response.ToStringResponseHandler;
import io.druid.client.indexing.IndexingServiceSelector;
import io.druid.client.selector.Server; import io.druid.client.selector.Server;
import io.druid.curator.discovery.ServerDiscoverySelector;
import io.druid.indexing.common.RetryPolicy; import io.druid.indexing.common.RetryPolicy;
import io.druid.indexing.common.RetryPolicyFactory; import io.druid.indexing.common.RetryPolicyFactory;
import io.druid.indexing.common.task.Task; import io.druid.indexing.common.task.Task;
@ -42,7 +42,7 @@ public class RemoteTaskActionClient implements TaskActionClient
{ {
private final Task task; private final Task task;
private final HttpClient httpClient; private final HttpClient httpClient;
private final IndexingServiceSelector serviceProvider; private final ServerDiscoverySelector selector;
private final RetryPolicyFactory retryPolicyFactory; private final RetryPolicyFactory retryPolicyFactory;
private final ObjectMapper jsonMapper; private final ObjectMapper jsonMapper;
@ -51,14 +51,14 @@ public class RemoteTaskActionClient implements TaskActionClient
public RemoteTaskActionClient( public RemoteTaskActionClient(
Task task, Task task,
HttpClient httpClient, HttpClient httpClient,
IndexingServiceSelector serviceProvider, ServerDiscoverySelector selector,
RetryPolicyFactory retryPolicyFactory, RetryPolicyFactory retryPolicyFactory,
ObjectMapper jsonMapper ObjectMapper jsonMapper
) )
{ {
this.task = task; this.task = task;
this.httpClient = httpClient; this.httpClient = httpClient;
this.serviceProvider = serviceProvider; this.selector = selector;
this.retryPolicyFactory = retryPolicyFactory; this.retryPolicyFactory = retryPolicyFactory;
this.jsonMapper = jsonMapper; this.jsonMapper = jsonMapper;
} }
@ -127,7 +127,7 @@ public class RemoteTaskActionClient implements TaskActionClient
private URI getServiceUri() throws Exception private URI getServiceUri() throws Exception
{ {
final Server instance = serviceProvider.pick(); final Server instance = selector.pick();
if (instance == null) { if (instance == null) {
throw new ISE("Cannot find instance of indexer to talk to!"); throw new ISE("Cannot find instance of indexer to talk to!");
} }

View File

@ -22,7 +22,8 @@ package io.druid.indexing.common.actions;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.inject.Inject; import com.google.inject.Inject;
import com.metamx.http.client.HttpClient; import com.metamx.http.client.HttpClient;
import io.druid.client.indexing.IndexingServiceSelector; import io.druid.client.indexing.IndexingService;
import io.druid.curator.discovery.ServerDiscoverySelector;
import io.druid.guice.annotations.Global; import io.druid.guice.annotations.Global;
import io.druid.indexing.common.RetryPolicyFactory; import io.druid.indexing.common.RetryPolicyFactory;
import io.druid.indexing.common.task.Task; import io.druid.indexing.common.task.Task;
@ -32,20 +33,20 @@ import io.druid.indexing.common.task.Task;
public class RemoteTaskActionClientFactory implements TaskActionClientFactory public class RemoteTaskActionClientFactory implements TaskActionClientFactory
{ {
private final HttpClient httpClient; private final HttpClient httpClient;
private final IndexingServiceSelector serviceProvider; private final ServerDiscoverySelector selector;
private final RetryPolicyFactory retryPolicyFactory; private final RetryPolicyFactory retryPolicyFactory;
private final ObjectMapper jsonMapper; private final ObjectMapper jsonMapper;
@Inject @Inject
public RemoteTaskActionClientFactory( public RemoteTaskActionClientFactory(
@Global HttpClient httpClient, @Global HttpClient httpClient,
IndexingServiceSelector serviceProvider, @IndexingService ServerDiscoverySelector selector,
RetryPolicyFactory retryPolicyFactory, RetryPolicyFactory retryPolicyFactory,
ObjectMapper jsonMapper ObjectMapper jsonMapper
) )
{ {
this.httpClient = httpClient; this.httpClient = httpClient;
this.serviceProvider = serviceProvider; this.selector = selector;
this.retryPolicyFactory = retryPolicyFactory; this.retryPolicyFactory = retryPolicyFactory;
this.jsonMapper = jsonMapper; this.jsonMapper = jsonMapper;
} }
@ -53,6 +54,6 @@ public class RemoteTaskActionClientFactory implements TaskActionClientFactory
@Override @Override
public TaskActionClient create(Task task) public TaskActionClient create(Task task)
{ {
return new RemoteTaskActionClient(task, httpClient, serviceProvider, retryPolicyFactory, jsonMapper); return new RemoteTaskActionClient(task, httpClient, selector, retryPolicyFactory, jsonMapper);
} }
} }

View File

@ -26,8 +26,8 @@ import com.metamx.common.IAE;
import com.metamx.common.ISE; import com.metamx.common.ISE;
import com.metamx.http.client.HttpClient; import com.metamx.http.client.HttpClient;
import com.metamx.http.client.response.InputStreamResponseHandler; import com.metamx.http.client.response.InputStreamResponseHandler;
import io.druid.client.selector.DiscoverySelector;
import io.druid.client.selector.Server; import io.druid.client.selector.Server;
import io.druid.curator.discovery.ServerDiscoverySelector;
import io.druid.guice.annotations.Global; import io.druid.guice.annotations.Global;
import io.druid.timeline.DataSegment; import io.druid.timeline.DataSegment;
import org.joda.time.Interval; import org.joda.time.Interval;
@ -43,18 +43,18 @@ public class IndexingServiceClient
private final HttpClient client; private final HttpClient client;
private final ObjectMapper jsonMapper; private final ObjectMapper jsonMapper;
private final DiscoverySelector<Server> serviceProvider; private final ServerDiscoverySelector selector;
@Inject @Inject
public IndexingServiceClient( public IndexingServiceClient(
@Global HttpClient client, @Global HttpClient client,
ObjectMapper jsonMapper, ObjectMapper jsonMapper,
@IndexingService DiscoverySelector<Server> serviceProvider @IndexingService ServerDiscoverySelector selector
) )
{ {
this.client = client; this.client = client;
this.jsonMapper = jsonMapper; this.jsonMapper = jsonMapper;
this.serviceProvider = serviceProvider; this.selector = selector;
} }
public void mergeSegments(List<DataSegment> segments) public void mergeSegments(List<DataSegment> segments)
@ -106,7 +106,7 @@ public class IndexingServiceClient
private String baseUrl() private String baseUrl()
{ {
try { try {
final Server instance = serviceProvider.pick(); final Server instance = selector.pick();
if (instance == null) { if (instance == null) {
throw new ISE("Cannot find instance of indexingService"); throw new ISE("Cannot find instance of indexingService");
} }

View File

@ -59,7 +59,7 @@ import java.util.concurrent.ThreadFactory;
/** /**
* The DiscoveryModule allows for the registration of Keys of DruidNode objects, which it intends to be * The DiscoveryModule allows for the registration of Keys of DruidNode objects, which it intends to be
* automatically announced at the end of the lifecycle start. * automatically announced at the end of the lifecycle start.
* * <p/>
* In order for this to work a ServiceAnnouncer instance *must* be injected and instantiated first. * In order for this to work a ServiceAnnouncer instance *must* be injected and instantiated first.
* This can often be achieved by registering ServiceAnnouncer.class with the LifecycleModule. * This can often be achieved by registering ServiceAnnouncer.class with the LifecycleModule.
*/ */
@ -69,7 +69,7 @@ public class DiscoveryModule implements Module
/** /**
* Requests that the un-annotated DruidNode instance be injected and published as part of the lifecycle. * Requests that the un-annotated DruidNode instance be injected and published as part of the lifecycle.
* * <p/>
* That is, this module will announce the DruidNode instance returned by * That is, this module will announce the DruidNode instance returned by
* injector.getInstance(Key.get(DruidNode.class)) automatically. * injector.getInstance(Key.get(DruidNode.class)) automatically.
* Announcement will happen in the LAST stage of the Lifecycle * Announcement will happen in the LAST stage of the Lifecycle
@ -81,7 +81,7 @@ public class DiscoveryModule implements Module
/** /**
* Requests that the annotated DruidNode instance be injected and published as part of the lifecycle. * Requests that the annotated DruidNode instance be injected and published as part of the lifecycle.
* * <p/>
* That is, this module will announce the DruidNode instance returned by * That is, this module will announce the DruidNode instance returned by
* injector.getInstance(Key.get(DruidNode.class, annotation)) automatically. * injector.getInstance(Key.get(DruidNode.class, annotation)) automatically.
* Announcement will happen in the LAST stage of the Lifecycle * Announcement will happen in the LAST stage of the Lifecycle
@ -95,7 +95,7 @@ public class DiscoveryModule implements Module
/** /**
* Requests that the annotated DruidNode instance be injected and published as part of the lifecycle. * Requests that the annotated DruidNode instance be injected and published as part of the lifecycle.
* * <p/>
* That is, this module will announce the DruidNode instance returned by * That is, this module will announce the DruidNode instance returned by
* injector.getInstance(Key.get(DruidNode.class, annotation)) automatically. * injector.getInstance(Key.get(DruidNode.class, annotation)) automatically.
* Announcement will happen in the LAST stage of the Lifecycle * Announcement will happen in the LAST stage of the Lifecycle
@ -109,7 +109,7 @@ public class DiscoveryModule implements Module
/** /**
* Requests that the keyed DruidNode instance be injected and published as part of the lifecycle. * Requests that the keyed DruidNode instance be injected and published as part of the lifecycle.
* * <p/>
* That is, this module will announce the DruidNode instance returned by * That is, this module will announce the DruidNode instance returned by
* injector.getInstance(Key.get(DruidNode.class, annotation)) automatically. * injector.getInstance(Key.get(DruidNode.class, annotation)) automatically.
* Announcement will happen in the LAST stage of the Lifecycle * Announcement will happen in the LAST stage of the Lifecycle
@ -137,7 +137,9 @@ public class DiscoveryModule implements Module
.asEagerSingleton(); .asEagerSingleton();
} }
@Provides @LazySingleton @Named(NAME) @Provides
@LazySingleton
@Named(NAME)
public CuratorServiceAnnouncer getServiceAnnouncer( public CuratorServiceAnnouncer getServiceAnnouncer(
final CuratorServiceAnnouncer announcer, final CuratorServiceAnnouncer announcer,
final Injector injector, final Injector injector,
@ -181,7 +183,8 @@ public class DiscoveryModule implements Module
return announcer; return announcer;
} }
@Provides @LazySingleton @Provides
@LazySingleton
public ServiceDiscovery<Void> getServiceDiscovery( public ServiceDiscovery<Void> getServiceDiscovery(
CuratorFramework curator, CuratorFramework curator,
CuratorDiscoveryConfig config, CuratorDiscoveryConfig config,
@ -217,13 +220,21 @@ public class DiscoveryModule implements Module
throw Throwables.propagate(e); throw Throwables.propagate(e);
} }
} }
}, }
Lifecycle.Stage.LAST
); );
return serviceDiscovery; return serviceDiscovery;
} }
@Provides
@LazySingleton
public ServerDiscoveryFactory getServerDiscoveryFactory(
ServiceDiscovery<Void> serviceDiscovery
)
{
return new ServerDiscoveryFactory(serviceDiscovery);
}
private static class NoopServiceDiscovery<T> implements ServiceDiscovery<T> private static class NoopServiceDiscovery<T> implements ServiceDiscovery<T>
{ {
@Override @Override

View File

@ -0,0 +1,72 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.curator.discovery;
import com.google.inject.Inject;
import org.apache.curator.x.discovery.ServiceDiscovery;
import org.apache.curator.x.discovery.ServiceInstance;
import org.apache.curator.x.discovery.ServiceProvider;
import java.io.IOException;
/**
*/
public class ServerDiscoveryFactory
{
private final ServiceDiscovery<Void> serviceDiscovery;
@Inject
public ServerDiscoveryFactory(ServiceDiscovery<Void> serviceDiscovery)
{
this.serviceDiscovery = serviceDiscovery;
}
public ServerDiscoverySelector createSelector(String serviceName)
{
if (serviceName == null) {
return new ServerDiscoverySelector(new NoopServiceProvider());
}
final ServiceProvider serviceProvider = serviceDiscovery.serviceProviderBuilder().serviceName(serviceName).build();
return new ServerDiscoverySelector(serviceProvider);
}
private static class NoopServiceProvider<T> implements ServiceProvider<T>
{
@Override
public void start() throws Exception
{
// do nothing
}
@Override
public ServiceInstance<T> getInstance() throws Exception
{
return null;
}
@Override
public void close() throws IOException
{
// do nothing
}
}
}

View File

@ -17,9 +17,8 @@
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/ */
package io.druid.client.indexing; package io.druid.curator.discovery;
import com.google.inject.Inject;
import com.metamx.common.lifecycle.LifecycleStart; import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop; import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.common.logger.Logger; import com.metamx.common.logger.Logger;
@ -28,21 +27,18 @@ import io.druid.client.selector.Server;
import org.apache.curator.x.discovery.ServiceInstance; import org.apache.curator.x.discovery.ServiceInstance;
import org.apache.curator.x.discovery.ServiceProvider; import org.apache.curator.x.discovery.ServiceProvider;
import javax.annotation.Nullable;
import java.io.IOException; import java.io.IOException;
/** /**
*/ */
public class IndexingServiceSelector implements DiscoverySelector<Server> public class ServerDiscoverySelector implements DiscoverySelector<Server>
{ {
private static final Logger log = new Logger(IndexingServiceSelector.class); private static final Logger log = new Logger(ServerDiscoverySelector.class);
private final ServiceProvider serviceProvider; private final ServiceProvider serviceProvider;
@Inject public ServerDiscoverySelector(ServiceProvider serviceProvider)
public IndexingServiceSelector( {
@Nullable @IndexingService ServiceProvider serviceProvider
) {
this.serviceProvider = serviceProvider; this.serviceProvider = serviceProvider;
} }
@ -54,7 +50,12 @@ public class IndexingServiceSelector implements DiscoverySelector<Server>
instance = serviceProvider.getInstance(); instance = serviceProvider.getInstance();
} }
catch (Exception e) { catch (Exception e) {
log.info(e, ""); log.info(e, "Exception getting instance");
return null;
}
if (instance == null) {
log.error("No server instance found");
return null; return null;
} }

View File

@ -22,17 +22,10 @@ package io.druid.guice;
import com.google.inject.Binder; import com.google.inject.Binder;
import com.google.inject.Module; import com.google.inject.Module;
import com.google.inject.Provides; import com.google.inject.Provides;
import com.google.inject.TypeLiteral;
import io.druid.client.indexing.IndexingService; import io.druid.client.indexing.IndexingService;
import io.druid.client.indexing.IndexingServiceSelector;
import io.druid.client.indexing.IndexingServiceSelectorConfig; import io.druid.client.indexing.IndexingServiceSelectorConfig;
import io.druid.client.selector.DiscoverySelector; import io.druid.curator.discovery.ServerDiscoveryFactory;
import io.druid.client.selector.Server; import io.druid.curator.discovery.ServerDiscoverySelector;
import org.apache.curator.x.discovery.ServiceDiscovery;
import org.apache.curator.x.discovery.ServiceInstance;
import org.apache.curator.x.discovery.ServiceProvider;
import java.io.IOException;
/** /**
*/ */
@ -42,42 +35,16 @@ public class IndexingServiceDiscoveryModule implements Module
public void configure(Binder binder) public void configure(Binder binder)
{ {
JsonConfigProvider.bind(binder, "druid.selectors.indexing", IndexingServiceSelectorConfig.class); JsonConfigProvider.bind(binder, "druid.selectors.indexing", IndexingServiceSelectorConfig.class);
binder.bind(new TypeLiteral<DiscoverySelector<Server>>(){})
.annotatedWith(IndexingService.class)
.to(IndexingServiceSelector.class);
binder.bind(IndexingServiceSelector.class).in(ManageLifecycle.class);
} }
@Provides @Provides
@LazySingleton @IndexingService @IndexingService
public ServiceProvider getServiceProvider( @ManageLifecycle
public ServerDiscoverySelector getServiceProvider(
IndexingServiceSelectorConfig config, IndexingServiceSelectorConfig config,
ServiceDiscovery<Void> serviceDiscovery ServerDiscoveryFactory serverDiscoveryFactory
) )
{ {
if (config.getServiceName() == null) { return serverDiscoveryFactory.createSelector(config.getServiceName());
return new ServiceProvider()
{
@Override
public void start() throws Exception
{
}
@Override
public ServiceInstance getInstance() throws Exception
{
return null;
}
@Override
public void close() throws IOException
{
}
};
}
return serviceDiscovery.serviceProviderBuilder().serviceName(config.getServiceName()).build();
} }
} }

View File

@ -29,6 +29,10 @@ import java.util.List;
*/ */
public class ExtensionsConfig public class ExtensionsConfig
{ {
@JsonProperty
@NotNull
private boolean searchCurrentClassloader = true;
@JsonProperty @JsonProperty
@NotNull @NotNull
private List<String> coordinates = ImmutableList.of(); private List<String> coordinates = ImmutableList.of();
@ -44,6 +48,11 @@ public class ExtensionsConfig
"https://metamx.artifactoryonline.com/metamx/pub-libs-releases-local" "https://metamx.artifactoryonline.com/metamx/pub-libs-releases-local"
); );
public boolean searchCurrentClassloader()
{
return searchCurrentClassloader;
}
public List<String> getCoordinates() public List<String> getCoordinates()
{ {
return coordinates; return coordinates;

View File

@ -97,6 +97,13 @@ public class
final TeslaAether aether = getAetherClient(config); final TeslaAether aether = getAetherClient(config);
List<T> retVal = Lists.newArrayList(); List<T> retVal = Lists.newArrayList();
if (config.searchCurrentClassloader()) {
for (T module : ServiceLoader.load(clazz, Initialization.class.getClassLoader())) {
log.info("Adding local module[%s]", module.getClass());
retVal.add(module);
}
}
for (String coordinate : config.getCoordinates()) { for (String coordinate : config.getCoordinates()) {
log.info("Loading extension[%s]", coordinate); log.info("Loading extension[%s]", coordinate);
try { try {
@ -139,14 +146,13 @@ public class
for (Artifact artifact : artifacts) { for (Artifact artifact : artifacts) {
if (!exclusions.contains(artifact.getGroupId())) { if (!exclusions.contains(artifact.getGroupId())) {
urls.add(artifact.getFile().toURI().toURL()); urls.add(artifact.getFile().toURI().toURL());
} } else {
else {
log.error("Skipped Artifact[%s]", artifact); log.error("Skipped Artifact[%s]", artifact);
} }
} }
for (URL url : urls) { for (URL url : urls) {
log.error("Added URL[%s]", url); log.info("Added URL[%s]", url);
} }
loader = new URLClassLoader(urls.toArray(new URL[urls.size()]), Initialization.class.getClassLoader()); loader = new URLClassLoader(urls.toArray(new URL[urls.size()]), Initialization.class.getClassLoader());
@ -243,7 +249,8 @@ public class
private final ObjectMapper smileMapper; private final ObjectMapper smileMapper;
private final List<Module> modules; private final List<Module> modules;
public ModuleList(Injector baseInjector) { public ModuleList(Injector baseInjector)
{
this.baseInjector = baseInjector; this.baseInjector = baseInjector;
this.jsonMapper = baseInjector.getInstance(Key.get(ObjectMapper.class, Json.class)); this.jsonMapper = baseInjector.getInstance(Key.get(ObjectMapper.class, Json.class));
this.smileMapper = baseInjector.getInstance(Key.get(ObjectMapper.class, Smile.class)); this.smileMapper = baseInjector.getInstance(Key.get(ObjectMapper.class, Smile.class));
@ -260,24 +267,19 @@ public class
if (input instanceof DruidModule) { if (input instanceof DruidModule) {
baseInjector.injectMembers(input); baseInjector.injectMembers(input);
modules.add(registerJacksonModules(((DruidModule) input))); modules.add(registerJacksonModules(((DruidModule) input)));
} } else if (input instanceof Module) {
else if (input instanceof Module) {
baseInjector.injectMembers(input); baseInjector.injectMembers(input);
modules.add((Module) input); modules.add((Module) input);
} } else if (input instanceof Class) {
else if (input instanceof Class) {
if (DruidModule.class.isAssignableFrom((Class) input)) { if (DruidModule.class.isAssignableFrom((Class) input)) {
modules.add(registerJacksonModules(baseInjector.getInstance((Class<? extends DruidModule>) input))); modules.add(registerJacksonModules(baseInjector.getInstance((Class<? extends DruidModule>) input)));
} } else if (Module.class.isAssignableFrom((Class) input)) {
else if (Module.class.isAssignableFrom((Class) input)) {
modules.add(baseInjector.getInstance((Class<? extends Module>) input)); modules.add(baseInjector.getInstance((Class<? extends Module>) input));
return; return;
} } else {
else {
throw new ISE("Class[%s] does not implement %s", input.getClass(), Module.class); throw new ISE("Class[%s] does not implement %s", input.getClass(), Module.class);
} }
} } else {
else {
throw new ISE("Unknown module type[%s]", input.getClass()); throw new ISE("Unknown module type[%s]", input.getClass());
} }
} }