mirror of https://github.com/apache/druid.git
refactor how server service discovery is done
This commit is contained in:
parent
1ff04412a2
commit
15843c3978
|
@ -27,8 +27,8 @@ import com.metamx.common.ISE;
|
|||
import com.metamx.common.logger.Logger;
|
||||
import com.metamx.http.client.HttpClient;
|
||||
import com.metamx.http.client.response.ToStringResponseHandler;
|
||||
import io.druid.client.indexing.IndexingServiceSelector;
|
||||
import io.druid.client.selector.Server;
|
||||
import io.druid.curator.discovery.ServerDiscoverySelector;
|
||||
import io.druid.indexing.common.RetryPolicy;
|
||||
import io.druid.indexing.common.RetryPolicyFactory;
|
||||
import io.druid.indexing.common.task.Task;
|
||||
|
@ -42,7 +42,7 @@ public class RemoteTaskActionClient implements TaskActionClient
|
|||
{
|
||||
private final Task task;
|
||||
private final HttpClient httpClient;
|
||||
private final IndexingServiceSelector serviceProvider;
|
||||
private final ServerDiscoverySelector selector;
|
||||
private final RetryPolicyFactory retryPolicyFactory;
|
||||
private final ObjectMapper jsonMapper;
|
||||
|
||||
|
@ -51,14 +51,14 @@ public class RemoteTaskActionClient implements TaskActionClient
|
|||
public RemoteTaskActionClient(
|
||||
Task task,
|
||||
HttpClient httpClient,
|
||||
IndexingServiceSelector serviceProvider,
|
||||
ServerDiscoverySelector selector,
|
||||
RetryPolicyFactory retryPolicyFactory,
|
||||
ObjectMapper jsonMapper
|
||||
)
|
||||
{
|
||||
this.task = task;
|
||||
this.httpClient = httpClient;
|
||||
this.serviceProvider = serviceProvider;
|
||||
this.selector = selector;
|
||||
this.retryPolicyFactory = retryPolicyFactory;
|
||||
this.jsonMapper = jsonMapper;
|
||||
}
|
||||
|
@ -127,7 +127,7 @@ public class RemoteTaskActionClient implements TaskActionClient
|
|||
|
||||
private URI getServiceUri() throws Exception
|
||||
{
|
||||
final Server instance = serviceProvider.pick();
|
||||
final Server instance = selector.pick();
|
||||
if (instance == null) {
|
||||
throw new ISE("Cannot find instance of indexer to talk to!");
|
||||
}
|
||||
|
|
|
@ -22,7 +22,8 @@ package io.druid.indexing.common.actions;
|
|||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.inject.Inject;
|
||||
import com.metamx.http.client.HttpClient;
|
||||
import io.druid.client.indexing.IndexingServiceSelector;
|
||||
import io.druid.client.indexing.IndexingService;
|
||||
import io.druid.curator.discovery.ServerDiscoverySelector;
|
||||
import io.druid.guice.annotations.Global;
|
||||
import io.druid.indexing.common.RetryPolicyFactory;
|
||||
import io.druid.indexing.common.task.Task;
|
||||
|
@ -32,20 +33,20 @@ import io.druid.indexing.common.task.Task;
|
|||
public class RemoteTaskActionClientFactory implements TaskActionClientFactory
|
||||
{
|
||||
private final HttpClient httpClient;
|
||||
private final IndexingServiceSelector serviceProvider;
|
||||
private final ServerDiscoverySelector selector;
|
||||
private final RetryPolicyFactory retryPolicyFactory;
|
||||
private final ObjectMapper jsonMapper;
|
||||
|
||||
@Inject
|
||||
public RemoteTaskActionClientFactory(
|
||||
@Global HttpClient httpClient,
|
||||
IndexingServiceSelector serviceProvider,
|
||||
@IndexingService ServerDiscoverySelector selector,
|
||||
RetryPolicyFactory retryPolicyFactory,
|
||||
ObjectMapper jsonMapper
|
||||
)
|
||||
{
|
||||
this.httpClient = httpClient;
|
||||
this.serviceProvider = serviceProvider;
|
||||
this.selector = selector;
|
||||
this.retryPolicyFactory = retryPolicyFactory;
|
||||
this.jsonMapper = jsonMapper;
|
||||
}
|
||||
|
@ -53,6 +54,6 @@ public class RemoteTaskActionClientFactory implements TaskActionClientFactory
|
|||
@Override
|
||||
public TaskActionClient create(Task task)
|
||||
{
|
||||
return new RemoteTaskActionClient(task, httpClient, serviceProvider, retryPolicyFactory, jsonMapper);
|
||||
return new RemoteTaskActionClient(task, httpClient, selector, retryPolicyFactory, jsonMapper);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -26,8 +26,8 @@ import com.metamx.common.IAE;
|
|||
import com.metamx.common.ISE;
|
||||
import com.metamx.http.client.HttpClient;
|
||||
import com.metamx.http.client.response.InputStreamResponseHandler;
|
||||
import io.druid.client.selector.DiscoverySelector;
|
||||
import io.druid.client.selector.Server;
|
||||
import io.druid.curator.discovery.ServerDiscoverySelector;
|
||||
import io.druid.guice.annotations.Global;
|
||||
import io.druid.timeline.DataSegment;
|
||||
import org.joda.time.Interval;
|
||||
|
@ -43,18 +43,18 @@ public class IndexingServiceClient
|
|||
|
||||
private final HttpClient client;
|
||||
private final ObjectMapper jsonMapper;
|
||||
private final DiscoverySelector<Server> serviceProvider;
|
||||
private final ServerDiscoverySelector selector;
|
||||
|
||||
@Inject
|
||||
public IndexingServiceClient(
|
||||
@Global HttpClient client,
|
||||
ObjectMapper jsonMapper,
|
||||
@IndexingService DiscoverySelector<Server> serviceProvider
|
||||
@IndexingService ServerDiscoverySelector selector
|
||||
)
|
||||
{
|
||||
this.client = client;
|
||||
this.jsonMapper = jsonMapper;
|
||||
this.serviceProvider = serviceProvider;
|
||||
this.selector = selector;
|
||||
}
|
||||
|
||||
public void mergeSegments(List<DataSegment> segments)
|
||||
|
@ -106,7 +106,7 @@ public class IndexingServiceClient
|
|||
private String baseUrl()
|
||||
{
|
||||
try {
|
||||
final Server instance = serviceProvider.pick();
|
||||
final Server instance = selector.pick();
|
||||
if (instance == null) {
|
||||
throw new ISE("Cannot find instance of indexingService");
|
||||
}
|
||||
|
|
|
@ -224,6 +224,14 @@ public class DiscoveryModule implements Module
|
|||
return serviceDiscovery;
|
||||
}
|
||||
|
||||
@Provides @LazySingleton
|
||||
public ServerDiscoveryFactory getServerDiscoveryFactory(
|
||||
ServiceDiscovery<Void> serviceDiscovery
|
||||
)
|
||||
{
|
||||
return new ServerDiscoveryFactory(serviceDiscovery);
|
||||
}
|
||||
|
||||
private static class NoopServiceDiscovery<T> implements ServiceDiscovery<T>
|
||||
{
|
||||
@Override
|
||||
|
|
|
@ -0,0 +1,72 @@
|
|||
/*
|
||||
* Druid - a distributed column store.
|
||||
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
|
||||
*
|
||||
* This program is free software; you can redistribute it and/or
|
||||
* modify it under the terms of the GNU General Public License
|
||||
* as published by the Free Software Foundation; either version 2
|
||||
* of the License, or (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with this program; if not, write to the Free Software
|
||||
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||
*/
|
||||
|
||||
package io.druid.curator.discovery;
|
||||
|
||||
import com.google.inject.Inject;
|
||||
import org.apache.curator.x.discovery.ServiceDiscovery;
|
||||
import org.apache.curator.x.discovery.ServiceInstance;
|
||||
import org.apache.curator.x.discovery.ServiceProvider;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
/**
|
||||
*/
|
||||
public class ServerDiscoveryFactory
|
||||
{
|
||||
private final ServiceDiscovery<Void> serviceDiscovery;
|
||||
|
||||
@Inject
|
||||
public ServerDiscoveryFactory(ServiceDiscovery<Void> serviceDiscovery)
|
||||
{
|
||||
this.serviceDiscovery = serviceDiscovery;
|
||||
}
|
||||
|
||||
public ServerDiscoverySelector createSelector(String serviceName)
|
||||
{
|
||||
if (serviceName == null) {
|
||||
return new ServerDiscoverySelector(new NoopServiceProvider());
|
||||
}
|
||||
|
||||
final ServiceProvider serviceProvider = serviceDiscovery.serviceProviderBuilder().serviceName(serviceName).build();
|
||||
return new ServerDiscoverySelector(serviceProvider);
|
||||
}
|
||||
|
||||
private static class NoopServiceProvider<T> implements ServiceProvider<T>
|
||||
{
|
||||
@Override
|
||||
public void start() throws Exception
|
||||
{
|
||||
// do nothing
|
||||
}
|
||||
|
||||
@Override
|
||||
public ServiceInstance<T> getInstance() throws Exception
|
||||
{
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException
|
||||
{
|
||||
// do nothing
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -17,9 +17,8 @@
|
|||
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||
*/
|
||||
|
||||
package io.druid.client.indexing;
|
||||
package io.druid.curator.discovery;
|
||||
|
||||
import com.google.inject.Inject;
|
||||
import com.metamx.common.lifecycle.LifecycleStart;
|
||||
import com.metamx.common.lifecycle.LifecycleStop;
|
||||
import com.metamx.common.logger.Logger;
|
||||
|
@ -28,21 +27,18 @@ import io.druid.client.selector.Server;
|
|||
import org.apache.curator.x.discovery.ServiceInstance;
|
||||
import org.apache.curator.x.discovery.ServiceProvider;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
import java.io.IOException;
|
||||
|
||||
/**
|
||||
*/
|
||||
public class IndexingServiceSelector implements DiscoverySelector<Server>
|
||||
*/
|
||||
public class ServerDiscoverySelector implements DiscoverySelector<Server>
|
||||
{
|
||||
private static final Logger log = new Logger(IndexingServiceSelector.class);
|
||||
private static final Logger log = new Logger(ServerDiscoverySelector.class);
|
||||
|
||||
private final ServiceProvider serviceProvider;
|
||||
|
||||
@Inject
|
||||
public IndexingServiceSelector(
|
||||
@Nullable @IndexingService ServiceProvider serviceProvider
|
||||
) {
|
||||
public ServerDiscoverySelector(ServiceProvider serviceProvider)
|
||||
{
|
||||
this.serviceProvider = serviceProvider;
|
||||
}
|
||||
|
||||
|
@ -54,7 +50,12 @@ public class IndexingServiceSelector implements DiscoverySelector<Server>
|
|||
instance = serviceProvider.getInstance();
|
||||
}
|
||||
catch (Exception e) {
|
||||
log.info(e, "");
|
||||
log.info(e, "Exception getting instance");
|
||||
return null;
|
||||
}
|
||||
|
||||
if (instance == null) {
|
||||
log.error("No server instance found");
|
||||
return null;
|
||||
}
|
||||
|
|
@ -22,17 +22,10 @@ package io.druid.guice;
|
|||
import com.google.inject.Binder;
|
||||
import com.google.inject.Module;
|
||||
import com.google.inject.Provides;
|
||||
import com.google.inject.TypeLiteral;
|
||||
import io.druid.client.indexing.IndexingService;
|
||||
import io.druid.client.indexing.IndexingServiceSelector;
|
||||
import io.druid.client.indexing.IndexingServiceSelectorConfig;
|
||||
import io.druid.client.selector.DiscoverySelector;
|
||||
import io.druid.client.selector.Server;
|
||||
import org.apache.curator.x.discovery.ServiceDiscovery;
|
||||
import org.apache.curator.x.discovery.ServiceInstance;
|
||||
import org.apache.curator.x.discovery.ServiceProvider;
|
||||
|
||||
import java.io.IOException;
|
||||
import io.druid.curator.discovery.ServerDiscoveryFactory;
|
||||
import io.druid.curator.discovery.ServerDiscoverySelector;
|
||||
|
||||
/**
|
||||
*/
|
||||
|
@ -42,42 +35,16 @@ public class IndexingServiceDiscoveryModule implements Module
|
|||
public void configure(Binder binder)
|
||||
{
|
||||
JsonConfigProvider.bind(binder, "druid.selectors.indexing", IndexingServiceSelectorConfig.class);
|
||||
binder.bind(new TypeLiteral<DiscoverySelector<Server>>(){})
|
||||
.annotatedWith(IndexingService.class)
|
||||
.to(IndexingServiceSelector.class);
|
||||
|
||||
binder.bind(IndexingServiceSelector.class).in(ManageLifecycle.class);
|
||||
}
|
||||
|
||||
@Provides
|
||||
@LazySingleton @IndexingService
|
||||
public ServiceProvider getServiceProvider(
|
||||
@IndexingService
|
||||
@ManageLifecycle
|
||||
public ServerDiscoverySelector getServiceProvider(
|
||||
IndexingServiceSelectorConfig config,
|
||||
ServiceDiscovery<Void> serviceDiscovery
|
||||
ServerDiscoveryFactory serverDiscoveryFactory
|
||||
)
|
||||
{
|
||||
if (config.getServiceName() == null) {
|
||||
return new ServiceProvider()
|
||||
{
|
||||
@Override
|
||||
public void start() throws Exception
|
||||
{
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public ServiceInstance getInstance() throws Exception
|
||||
{
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException
|
||||
{
|
||||
|
||||
}
|
||||
};
|
||||
}
|
||||
return serviceDiscovery.serviceProviderBuilder().serviceName(config.getServiceName()).build();
|
||||
return serverDiscoveryFactory.createSelector(config.getServiceName());
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue