Merger: Service discovery for worker -> master communication

This commit is contained in:
Gian Merlino 2013-02-26 11:36:07 -08:00
parent 2427e81874
commit d8fbddb9d4
4 changed files with 90 additions and 17 deletions

View File

@ -1,25 +1,31 @@
package com.metamx.druid.merger.common.actions;
import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.base.Charsets;
import com.google.common.base.Throwables;
import com.metamx.common.logger.Logger;
import com.metamx.http.client.HttpClient;
import com.metamx.http.client.response.ToStringResponseHandler;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.netflix.curator.x.discovery.ServiceInstance;
import com.netflix.curator.x.discovery.ServiceProvider;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Map;
public class RemoteTaskActionClient implements TaskActionClient
{
private final HttpClient httpClient;
private final ServiceProvider serviceProvider;
private final ObjectMapper jsonMapper;
private static final Logger log = new Logger(RemoteTaskActionClient.class);
public RemoteTaskActionClient(HttpClient httpClient, ObjectMapper jsonMapper)
public RemoteTaskActionClient(HttpClient httpClient, ServiceProvider serviceProvider, ObjectMapper jsonMapper)
{
this.httpClient = httpClient;
this.serviceProvider = serviceProvider;
this.jsonMapper = jsonMapper;
}
@ -34,20 +40,36 @@ public class RemoteTaskActionClient implements TaskActionClient
.go(new ToStringResponseHandler(Charsets.UTF_8))
.get();
// TODO Figure out how to check HTTP status code
if(response.equals("")) {
return null;
} else {
return jsonMapper.readValue(response, taskAction.getReturnTypeReference());
}
final Map<String, Object> responseDict = jsonMapper.readValue(
response,
new TypeReference<Map<String, Object>>() {}
);
return jsonMapper.convertValue(responseDict.get("result"), taskAction.getReturnTypeReference());
}
catch (Exception e) {
throw Throwables.propagate(e);
}
}
public URI getServiceUri() throws URISyntaxException
private URI getServiceUri() throws Exception
{
return new URI("http://localhost:8087/mmx/merger/v1/action");
final ServiceInstance instance = serviceProvider.getInstance();
final String scheme;
final String host;
final int port;
final String path = "/mmx/merger/v1/action";
host = instance.getAddress();
if (instance.getSslPort() != null && instance.getSslPort() > 0) {
scheme = "https";
port = instance.getSslPort();
} else {
scheme = "http";
port = instance.getPort();
}
return new URI(scheme, null, host, port, path, null, null);
}
}

View File

@ -21,6 +21,7 @@ package com.metamx.druid.merger.coordinator.http;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import com.google.inject.Inject;
import com.metamx.common.logger.Logger;
import com.metamx.druid.client.DataSegment;
@ -176,6 +177,9 @@ public class IndexerCoordinatorResource
public <T> Response doAction(final TaskAction<T> action)
{
final T ret = taskMasterLifecycle.getTaskToolbox().getTaskActionClient().submit(action);
return Response.ok().entity(ret).build();
final Map<String, Object> retMap = Maps.newHashMap();
retMap.put("result", ret);
return Response.ok().entity(retMap).build();
}
}

View File

@ -39,6 +39,9 @@ public abstract class WorkerConfig
@Config("druid.worker.version")
public abstract String getVersion();
@Config("druid.worker.masterService")
public abstract String getMasterService();
public int getCapacity()
{
return Runtime.getRuntime().availableProcessors() - 1;

View File

@ -35,6 +35,7 @@ import com.metamx.druid.http.StatusServlet;
import com.metamx.druid.initialization.CuratorConfig;
import com.metamx.druid.initialization.Initialization;
import com.metamx.druid.initialization.ServerConfig;
import com.metamx.druid.initialization.ServiceDiscoveryConfig;
import com.metamx.druid.jackson.DefaultObjectMapper;
import com.metamx.druid.loading.DataSegmentPusher;
import com.metamx.druid.loading.S3DataSegmentPusher;
@ -64,6 +65,8 @@ import com.metamx.metrics.MonitorSchedulerConfig;
import com.metamx.metrics.SysMonitor;
import com.netflix.curator.framework.CuratorFramework;
import com.netflix.curator.framework.recipes.cache.PathChildrenCache;
import com.netflix.curator.x.discovery.ServiceDiscovery;
import com.netflix.curator.x.discovery.ServiceProvider;
import org.jets3t.service.S3ServiceException;
import org.jets3t.service.impl.rest.httpclient.RestS3Service;
import org.jets3t.service.security.AWSCredentials;
@ -105,6 +108,8 @@ public class WorkerNode extends RegisteringNode
private WorkerConfig workerConfig = null;
private TaskToolbox taskToolbox = null;
private CuratorFramework curatorFramework = null;
private ServiceDiscovery serviceDiscovery = null;
private ServiceProvider coordinatorServiceProvider = null;
private WorkerCuratorCoordinator workerCuratorCoordinator = null;
private TaskMonitor taskMonitor = null;
private Server server = null;
@ -156,6 +161,18 @@ public class WorkerNode extends RegisteringNode
return this;
}
public WorkerNode setCoordinatorServiceProvider(ServiceProvider coordinatorServiceProvider)
{
this.coordinatorServiceProvider = coordinatorServiceProvider;
return this;
}
public WorkerNode setServiceDiscovery(ServiceDiscovery serviceDiscovery)
{
this.serviceDiscovery = serviceDiscovery;
return this;
}
public WorkerNode setWorkerCuratorCoordinator(WorkerCuratorCoordinator workerCuratorCoordinator)
{
this.workerCuratorCoordinator = workerCuratorCoordinator;
@ -175,10 +192,12 @@ public class WorkerNode extends RegisteringNode
initializeS3Service();
initializeMonitors();
initializeMergerConfig();
initializeCuratorFramework();
initializeServiceDiscovery();
initializeCoordinatorServiceProvider();
initializeTaskToolbox();
initializeJacksonInjections();
initializeJacksonSubtypes();
initializeCuratorFramework();
initializeCuratorCoordinator();
initializeTaskMonitor();
initializeServer();
@ -328,7 +347,7 @@ public class WorkerNode extends RegisteringNode
);
taskToolbox = new TaskToolbox(
taskConfig,
new RemoteTaskActionClient(httpClient, jsonMapper),
new RemoteTaskActionClient(httpClient, coordinatorServiceProvider, jsonMapper),
emitter,
s3Service,
dataSegmentPusher,
@ -340,11 +359,36 @@ public class WorkerNode extends RegisteringNode
public void initializeCuratorFramework() throws IOException
{
final CuratorConfig curatorConfig = configFactory.build(CuratorConfig.class);
curatorFramework = Initialization.makeCuratorFrameworkClient(
curatorConfig,
lifecycle
);
if (curatorFramework == null) {
final CuratorConfig curatorConfig = configFactory.build(CuratorConfig.class);
curatorFramework = Initialization.makeCuratorFrameworkClient(
curatorConfig,
lifecycle
);
}
}
public void initializeServiceDiscovery() throws Exception
{
if (serviceDiscovery == null) {
final ServiceDiscoveryConfig config = configFactory.build(ServiceDiscoveryConfig.class);
this.serviceDiscovery = Initialization.makeServiceDiscoveryClient(
curatorFramework,
config,
lifecycle
);
}
}
public void initializeCoordinatorServiceProvider()
{
if (coordinatorServiceProvider == null) {
this.coordinatorServiceProvider = Initialization.makeServiceProvider(
workerConfig.getMasterService(),
serviceDiscovery,
lifecycle
);
}
}
public void initializeCuratorCoordinator()