From 15843c39780a8c06f40216777a0de4e7a024dd3c Mon Sep 17 00:00:00 2001 From: fjy Date: Tue, 24 Sep 2013 10:36:26 -0700 Subject: [PATCH] refactor how server service discovery is done --- .../actions/RemoteTaskActionClient.java | 10 +-- .../RemoteTaskActionClientFactory.java | 11 +-- .../indexing/IndexingServiceClient.java | 10 +-- .../curator/discovery/DiscoveryModule.java | 8 +++ .../discovery/ServerDiscoveryFactory.java | 72 +++++++++++++++++++ .../discovery/ServerDiscoverySelector.java} | 23 +++--- .../guice/IndexingServiceDiscoveryModule.java | 47 ++---------- 7 files changed, 115 insertions(+), 66 deletions(-) create mode 100644 server/src/main/java/io/druid/curator/discovery/ServerDiscoveryFactory.java rename server/src/main/java/io/druid/{client/indexing/IndexingServiceSelector.java => curator/discovery/ServerDiscoverySelector.java} (84%) diff --git a/indexing-service/src/main/java/io/druid/indexing/common/actions/RemoteTaskActionClient.java b/indexing-service/src/main/java/io/druid/indexing/common/actions/RemoteTaskActionClient.java index b2613e13064..41639aa9989 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/actions/RemoteTaskActionClient.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/actions/RemoteTaskActionClient.java @@ -27,8 +27,8 @@ import com.metamx.common.ISE; import com.metamx.common.logger.Logger; import com.metamx.http.client.HttpClient; import com.metamx.http.client.response.ToStringResponseHandler; -import io.druid.client.indexing.IndexingServiceSelector; import io.druid.client.selector.Server; +import io.druid.curator.discovery.ServerDiscoverySelector; import io.druid.indexing.common.RetryPolicy; import io.druid.indexing.common.RetryPolicyFactory; import io.druid.indexing.common.task.Task; @@ -42,7 +42,7 @@ public class RemoteTaskActionClient implements TaskActionClient { private final Task task; private final HttpClient httpClient; - private final IndexingServiceSelector serviceProvider; + private final ServerDiscoverySelector selector; private final RetryPolicyFactory retryPolicyFactory; private final ObjectMapper jsonMapper; @@ -51,14 +51,14 @@ public class RemoteTaskActionClient implements TaskActionClient public RemoteTaskActionClient( Task task, HttpClient httpClient, - IndexingServiceSelector serviceProvider, + ServerDiscoverySelector selector, RetryPolicyFactory retryPolicyFactory, ObjectMapper jsonMapper ) { this.task = task; this.httpClient = httpClient; - this.serviceProvider = serviceProvider; + this.selector = selector; this.retryPolicyFactory = retryPolicyFactory; this.jsonMapper = jsonMapper; } @@ -127,7 +127,7 @@ public class RemoteTaskActionClient implements TaskActionClient private URI getServiceUri() throws Exception { - final Server instance = serviceProvider.pick(); + final Server instance = selector.pick(); if (instance == null) { throw new ISE("Cannot find instance of indexer to talk to!"); } diff --git a/indexing-service/src/main/java/io/druid/indexing/common/actions/RemoteTaskActionClientFactory.java b/indexing-service/src/main/java/io/druid/indexing/common/actions/RemoteTaskActionClientFactory.java index 4ae53a595b9..033c1263ca3 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/actions/RemoteTaskActionClientFactory.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/actions/RemoteTaskActionClientFactory.java @@ -22,7 +22,8 @@ package io.druid.indexing.common.actions; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.inject.Inject; import com.metamx.http.client.HttpClient; -import io.druid.client.indexing.IndexingServiceSelector; +import io.druid.client.indexing.IndexingService; +import io.druid.curator.discovery.ServerDiscoverySelector; import io.druid.guice.annotations.Global; import io.druid.indexing.common.RetryPolicyFactory; import io.druid.indexing.common.task.Task; @@ -32,20 +33,20 @@ import io.druid.indexing.common.task.Task; public class RemoteTaskActionClientFactory implements TaskActionClientFactory { private final HttpClient httpClient; - private final IndexingServiceSelector serviceProvider; + private final ServerDiscoverySelector selector; private final RetryPolicyFactory retryPolicyFactory; private final ObjectMapper jsonMapper; @Inject public RemoteTaskActionClientFactory( @Global HttpClient httpClient, - IndexingServiceSelector serviceProvider, + @IndexingService ServerDiscoverySelector selector, RetryPolicyFactory retryPolicyFactory, ObjectMapper jsonMapper ) { this.httpClient = httpClient; - this.serviceProvider = serviceProvider; + this.selector = selector; this.retryPolicyFactory = retryPolicyFactory; this.jsonMapper = jsonMapper; } @@ -53,6 +54,6 @@ public class RemoteTaskActionClientFactory implements TaskActionClientFactory @Override public TaskActionClient create(Task task) { - return new RemoteTaskActionClient(task, httpClient, serviceProvider, retryPolicyFactory, jsonMapper); + return new RemoteTaskActionClient(task, httpClient, selector, retryPolicyFactory, jsonMapper); } } diff --git a/server/src/main/java/io/druid/client/indexing/IndexingServiceClient.java b/server/src/main/java/io/druid/client/indexing/IndexingServiceClient.java index 628ed978c42..abae59ee168 100644 --- a/server/src/main/java/io/druid/client/indexing/IndexingServiceClient.java +++ b/server/src/main/java/io/druid/client/indexing/IndexingServiceClient.java @@ -26,8 +26,8 @@ import com.metamx.common.IAE; import com.metamx.common.ISE; import com.metamx.http.client.HttpClient; import com.metamx.http.client.response.InputStreamResponseHandler; -import io.druid.client.selector.DiscoverySelector; import io.druid.client.selector.Server; +import io.druid.curator.discovery.ServerDiscoverySelector; import io.druid.guice.annotations.Global; import io.druid.timeline.DataSegment; import org.joda.time.Interval; @@ -43,18 +43,18 @@ public class IndexingServiceClient private final HttpClient client; private final ObjectMapper jsonMapper; - private final DiscoverySelector serviceProvider; + private final ServerDiscoverySelector selector; @Inject public IndexingServiceClient( @Global HttpClient client, ObjectMapper jsonMapper, - @IndexingService DiscoverySelector serviceProvider + @IndexingService ServerDiscoverySelector selector ) { this.client = client; this.jsonMapper = jsonMapper; - this.serviceProvider = serviceProvider; + this.selector = selector; } public void mergeSegments(List segments) @@ -106,7 +106,7 @@ public class IndexingServiceClient private String baseUrl() { try { - final Server instance = serviceProvider.pick(); + final Server instance = selector.pick(); if (instance == null) { throw new ISE("Cannot find instance of indexingService"); } diff --git a/server/src/main/java/io/druid/curator/discovery/DiscoveryModule.java b/server/src/main/java/io/druid/curator/discovery/DiscoveryModule.java index 3f577f2d64f..88fcdb3c299 100644 --- a/server/src/main/java/io/druid/curator/discovery/DiscoveryModule.java +++ b/server/src/main/java/io/druid/curator/discovery/DiscoveryModule.java @@ -224,6 +224,14 @@ public class DiscoveryModule implements Module return serviceDiscovery; } + @Provides @LazySingleton + public ServerDiscoveryFactory getServerDiscoveryFactory( + ServiceDiscovery serviceDiscovery + ) + { + return new ServerDiscoveryFactory(serviceDiscovery); + } + private static class NoopServiceDiscovery implements ServiceDiscovery { @Override diff --git a/server/src/main/java/io/druid/curator/discovery/ServerDiscoveryFactory.java b/server/src/main/java/io/druid/curator/discovery/ServerDiscoveryFactory.java new file mode 100644 index 00000000000..c436289e70e --- /dev/null +++ b/server/src/main/java/io/druid/curator/discovery/ServerDiscoveryFactory.java @@ -0,0 +1,72 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012, 2013 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package io.druid.curator.discovery; + +import com.google.inject.Inject; +import org.apache.curator.x.discovery.ServiceDiscovery; +import org.apache.curator.x.discovery.ServiceInstance; +import org.apache.curator.x.discovery.ServiceProvider; + +import java.io.IOException; + +/** + */ +public class ServerDiscoveryFactory +{ + private final ServiceDiscovery serviceDiscovery; + + @Inject + public ServerDiscoveryFactory(ServiceDiscovery serviceDiscovery) + { + this.serviceDiscovery = serviceDiscovery; + } + + public ServerDiscoverySelector createSelector(String serviceName) + { + if (serviceName == null) { + return new ServerDiscoverySelector(new NoopServiceProvider()); + } + + final ServiceProvider serviceProvider = serviceDiscovery.serviceProviderBuilder().serviceName(serviceName).build(); + return new ServerDiscoverySelector(serviceProvider); + } + + private static class NoopServiceProvider implements ServiceProvider + { + @Override + public void start() throws Exception + { + // do nothing + } + + @Override + public ServiceInstance getInstance() throws Exception + { + return null; + } + + @Override + public void close() throws IOException + { + // do nothing + } + } + +} diff --git a/server/src/main/java/io/druid/client/indexing/IndexingServiceSelector.java b/server/src/main/java/io/druid/curator/discovery/ServerDiscoverySelector.java similarity index 84% rename from server/src/main/java/io/druid/client/indexing/IndexingServiceSelector.java rename to server/src/main/java/io/druid/curator/discovery/ServerDiscoverySelector.java index 7c5eaaf8d72..b7cd72abf17 100644 --- a/server/src/main/java/io/druid/client/indexing/IndexingServiceSelector.java +++ b/server/src/main/java/io/druid/curator/discovery/ServerDiscoverySelector.java @@ -17,9 +17,8 @@ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ -package io.druid.client.indexing; +package io.druid.curator.discovery; -import com.google.inject.Inject; import com.metamx.common.lifecycle.LifecycleStart; import com.metamx.common.lifecycle.LifecycleStop; import com.metamx.common.logger.Logger; @@ -28,21 +27,18 @@ import io.druid.client.selector.Server; import org.apache.curator.x.discovery.ServiceInstance; import org.apache.curator.x.discovery.ServiceProvider; -import javax.annotation.Nullable; import java.io.IOException; /** -*/ -public class IndexingServiceSelector implements DiscoverySelector + */ +public class ServerDiscoverySelector implements DiscoverySelector { - private static final Logger log = new Logger(IndexingServiceSelector.class); + private static final Logger log = new Logger(ServerDiscoverySelector.class); private final ServiceProvider serviceProvider; - @Inject - public IndexingServiceSelector( - @Nullable @IndexingService ServiceProvider serviceProvider - ) { + public ServerDiscoverySelector(ServiceProvider serviceProvider) + { this.serviceProvider = serviceProvider; } @@ -54,7 +50,12 @@ public class IndexingServiceSelector implements DiscoverySelector instance = serviceProvider.getInstance(); } catch (Exception e) { - log.info(e, ""); + log.info(e, "Exception getting instance"); + return null; + } + + if (instance == null) { + log.error("No server instance found"); return null; } diff --git a/server/src/main/java/io/druid/guice/IndexingServiceDiscoveryModule.java b/server/src/main/java/io/druid/guice/IndexingServiceDiscoveryModule.java index 60e0c15281b..10535bb9d87 100644 --- a/server/src/main/java/io/druid/guice/IndexingServiceDiscoveryModule.java +++ b/server/src/main/java/io/druid/guice/IndexingServiceDiscoveryModule.java @@ -22,17 +22,10 @@ package io.druid.guice; import com.google.inject.Binder; import com.google.inject.Module; import com.google.inject.Provides; -import com.google.inject.TypeLiteral; import io.druid.client.indexing.IndexingService; -import io.druid.client.indexing.IndexingServiceSelector; import io.druid.client.indexing.IndexingServiceSelectorConfig; -import io.druid.client.selector.DiscoverySelector; -import io.druid.client.selector.Server; -import org.apache.curator.x.discovery.ServiceDiscovery; -import org.apache.curator.x.discovery.ServiceInstance; -import org.apache.curator.x.discovery.ServiceProvider; - -import java.io.IOException; +import io.druid.curator.discovery.ServerDiscoveryFactory; +import io.druid.curator.discovery.ServerDiscoverySelector; /** */ @@ -42,42 +35,16 @@ public class IndexingServiceDiscoveryModule implements Module public void configure(Binder binder) { JsonConfigProvider.bind(binder, "druid.selectors.indexing", IndexingServiceSelectorConfig.class); - binder.bind(new TypeLiteral>(){}) - .annotatedWith(IndexingService.class) - .to(IndexingServiceSelector.class); - - binder.bind(IndexingServiceSelector.class).in(ManageLifecycle.class); } @Provides - @LazySingleton @IndexingService - public ServiceProvider getServiceProvider( + @IndexingService + @ManageLifecycle + public ServerDiscoverySelector getServiceProvider( IndexingServiceSelectorConfig config, - ServiceDiscovery serviceDiscovery + ServerDiscoveryFactory serverDiscoveryFactory ) { - if (config.getServiceName() == null) { - return new ServiceProvider() - { - @Override - public void start() throws Exception - { - - } - - @Override - public ServiceInstance getInstance() throws Exception - { - return null; - } - - @Override - public void close() throws IOException - { - - } - }; - } - return serviceDiscovery.serviceProviderBuilder().serviceName(config.getServiceName()).build(); + return serverDiscoveryFactory.createSelector(config.getServiceName()); } }