intiial commit for stop task

This commit is contained in:
Fangjin Yang 2013-03-19 11:30:33 -07:00
parent b9de751b32
commit 30fdb2956d
16 changed files with 403 additions and 23 deletions

View File

@ -101,6 +101,11 @@ public abstract class AbstractTask implements Task
return TaskStatus.running(id);
}
@Override
public void shutdown()
{
}
@Override
public String toString()
{

View File

@ -21,8 +21,8 @@ import com.metamx.druid.merger.common.actions.SegmentInsertAction;
import com.metamx.druid.query.QueryRunner;
import com.metamx.druid.realtime.FireDepartmentConfig;
import com.metamx.druid.realtime.FireDepartmentMetrics;
import com.metamx.druid.realtime.Firehose;
import com.metamx.druid.realtime.FirehoseFactory;
import com.metamx.druid.realtime.GracefulShutdownFirehose;
import com.metamx.druid.realtime.plumber.Plumber;
import com.metamx.druid.realtime.plumber.RealtimePlumberSchool;
import com.metamx.druid.realtime.Schema;
@ -58,6 +58,9 @@ public class RealtimeIndexTask extends AbstractTask
@JsonIgnore
private volatile Plumber plumber = null;
@JsonIgnore
private volatile GracefulShutdownFirehose firehose = null;
private static final EmittingLogger log = new EmittingLogger(RealtimeIndexTask.class);
@JsonCreator
@ -123,7 +126,7 @@ public class RealtimeIndexTask extends AbstractTask
final FireDepartmentMetrics metrics = new FireDepartmentMetrics();
final Period intermediatePersistPeriod = fireDepartmentConfig.getIntermediatePersistPeriod();
final Firehose firehose = firehoseFactory.connect();
firehose = new GracefulShutdownFirehose(firehoseFactory.connect(), segmentGranularity, windowPeriod);
// TODO -- Take PlumberSchool in constructor (although that will need jackson injectables for stuff like
// TODO -- the ServerView, which seems kind of odd?)
@ -156,7 +159,8 @@ public class RealtimeIndexTask extends AbstractTask
{
try {
toolbox.getSegmentAnnouncer().unannounceSegment(segment);
} finally {
}
finally {
toolbox.getTaskActionClient().submit(new LockReleaseAction(segment.getInterval()));
}
}
@ -179,7 +183,8 @@ public class RealtimeIndexTask extends AbstractTask
.submit(new LockAcquireAction(interval));
return myLock.getVersion();
} catch (IOException e) {
}
catch (IOException e) {
throw Throwables.propagate(e);
}
}
@ -207,6 +212,9 @@ public class RealtimeIndexTask extends AbstractTask
final InputRow inputRow;
try {
inputRow = firehose.nextRow();
if (inputRow == null) {
continue;
}
final Sink sink = plumber.getSink(inputRow.getTimestampFromEpoch());
if (sink == null) {
@ -251,7 +259,8 @@ public class RealtimeIndexTask extends AbstractTask
try {
plumber.persist(firehose.commit());
plumber.finishJob();
} catch(Exception e) {
}
catch (Exception e) {
log.makeAlert(e, "Failed to finish realtime task").emit();
}
}
@ -260,6 +269,17 @@ public class RealtimeIndexTask extends AbstractTask
return TaskStatus.success(getId());
}
@Override
public void shutdown()
{
try {
firehose.shutdown();
}
catch (IOException e) {
throw Throwables.propagate(e);
}
}
@JsonProperty
public Schema getSchema()
{

View File

@ -118,4 +118,6 @@ public interface Task
* @throws Exception
*/
public TaskStatus run(TaskToolbox toolbox) throws Exception;
public void shutdown();
}

View File

@ -192,4 +192,9 @@ public class LocalTaskRunner implements TaskRunner
);
}
}
@Override
public void shutdown(String taskId)
{
}
}

View File

@ -19,7 +19,9 @@
package com.metamx.druid.merger.coordinator;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Charsets;
import com.google.common.base.Joiner;
import com.google.common.base.Predicate;
import com.google.common.base.Throwables;
@ -31,6 +33,7 @@ import com.metamx.common.ISE;
import com.metamx.common.guava.FunctionalIterable;
import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.druid.merger.common.RetryPolicy;
import com.metamx.druid.merger.common.RetryPolicyFactory;
import com.metamx.druid.merger.common.TaskCallback;
import com.metamx.druid.merger.common.TaskStatus;
@ -39,6 +42,8 @@ import com.metamx.druid.merger.coordinator.config.RemoteTaskRunnerConfig;
import com.metamx.druid.merger.coordinator.setup.WorkerSetupData;
import com.metamx.druid.merger.worker.Worker;
import com.metamx.emitter.EmittingLogger;
import com.metamx.http.client.HttpClient;
import com.metamx.http.client.response.ToStringResponseHandler;
import com.netflix.curator.framework.CuratorFramework;
import com.netflix.curator.framework.recipes.cache.PathChildrenCache;
import com.netflix.curator.framework.recipes.cache.PathChildrenCacheEvent;
@ -46,7 +51,11 @@ import com.netflix.curator.framework.recipes.cache.PathChildrenCacheListener;
import com.netflix.curator.utils.ZKPaths;
import org.apache.zookeeper.CreateMode;
import org.joda.time.DateTime;
import org.joda.time.Duration;
import java.io.IOException;
import java.net.URI;
import java.net.URL;
import java.util.Collection;
import java.util.Comparator;
import java.util.List;
@ -85,6 +94,7 @@ public class RemoteTaskRunner implements TaskRunner
private final ScheduledExecutorService scheduledExec;
private final RetryPolicyFactory retryPolicyFactory;
private final AtomicReference<WorkerSetupData> workerSetupData;
private final HttpClient httpClient;
// all workers that exist in ZK
private final Map<String, ZkWorker> zkWorkers = new ConcurrentHashMap<String, ZkWorker>();
@ -106,7 +116,8 @@ public class RemoteTaskRunner implements TaskRunner
PathChildrenCache workerPathCache,
ScheduledExecutorService scheduledExec,
RetryPolicyFactory retryPolicyFactory,
AtomicReference<WorkerSetupData> workerSetupData
AtomicReference<WorkerSetupData> workerSetupData,
HttpClient httpClient
)
{
this.jsonMapper = jsonMapper;
@ -116,6 +127,7 @@ public class RemoteTaskRunner implements TaskRunner
this.scheduledExec = scheduledExec;
this.retryPolicyFactory = retryPolicyFactory;
this.workerSetupData = workerSetupData;
this.httpClient = httpClient;
}
@LifecycleStart
@ -234,6 +246,54 @@ public class RemoteTaskRunner implements TaskRunner
runPendingTasks();
}
@Override
public void shutdown(String taskId)
{
final ZkWorker zkWorker = findWorkerRunningTask(taskId);
if (zkWorker == null) {
log.info("Can't shutdown! No worker running task %s", taskId);
return;
}
final RetryPolicy retryPolicy = retryPolicyFactory.makeRetryPolicy();
URL url;
try {
url = new URL(String.format("http://%s/mmx/v1/worker/shutdown", zkWorker.getWorker().getHost()));
}
catch (Exception e) {
throw Throwables.propagate(e);
}
while (!retryPolicy.hasExceededRetryThreshold()) {
try {
final String response = httpClient.post(url)
.setContent("application/json", jsonMapper.writeValueAsBytes(taskId))
.go(new ToStringResponseHandler(Charsets.UTF_8))
.get();
log.info("Sent shutdown message to worker: %s, response: %s", zkWorker.getWorker().getHost(), response);
return;
}
catch (Exception e) {
log.error(e, "Exception shutting down taskId: %s", taskId);
if (retryPolicy.hasExceededRetryThreshold()) {
throw Throwables.propagate(e);
} else {
try {
final long sleepTime = retryPolicy.getAndIncrementRetryDelay().getMillis();
log.info("Will try again in %s.", new Duration(sleepTime).toString());
Thread.sleep(sleepTime);
}
catch (InterruptedException e2) {
throw Throwables.propagate(e2);
}
}
}
}
}
/**
* This method uses a single threaded executor to extract all pending tasks and attempt to run them. Any tasks that
* are successfully assigned to a worker will be moved from pendingTasks to runningTasks. This method is thread-safe.
@ -571,9 +631,4 @@ public class RemoteTaskRunner implements TaskRunner
throw Throwables.propagate(e);
}
}
public static void main(String[] args)
{
System.out.println("2013-03-11".compareTo("0"));
}
}

View File

@ -44,4 +44,6 @@ public interface TaskRunner
public Collection<TaskRunnerWorkItem> getPendingTasks();
public Collection<ZkWorker> getWorkers();
public void shutdown(String taskId);
}

View File

@ -196,13 +196,29 @@ public class IndexerCoordinatorResource
.submit(holder.getAction());
retMap = Maps.newHashMap();
retMap.put("result", ret);
} catch(IOException e) {
}
catch (IOException e) {
return Response.serverError().build();
}
return Response.ok().entity(retMap).build();
}
@POST
@Path("/shutdown")
@Produces("application/json")
public Response doShutdown(final String taskId)
{
try {
taskMasterLifecycle.getTaskRunner().shutdown(taskId);
}
catch (Exception e) {
return Response.serverError().build();
}
return Response.ok().build();
}
@GET
@Path("/pendingTasks")
@Produces("application/json")

View File

@ -28,12 +28,19 @@ public interface AutoScalingStrategy<T>
{
public AutoScalingData<T> provision();
public AutoScalingData<T> terminate(List<String> ids);
public AutoScalingData<T> terminate(List<String> ips);
/**
* Provides a lookup of ip addresses to node ids
* @param ips - nodes ips
* @param ips - nodes IPs
* @return node ids
*/
public List<String> ipToIdLookup(List<String> ips);
/**
* Provides a lookup of node ids to ip addresses
* @param nodeIds - nodes ids
* @return IPs associated with the node
*/
public List<String> idToIpLookup(List<String> nodeIds);
}

View File

@ -128,16 +128,16 @@ public class EC2AutoScalingStrategy implements AutoScalingStrategy<Instance>
}
@Override
public AutoScalingData<Instance> terminate(List<String> ids)
public AutoScalingData<Instance> terminate(List<String> ips)
{
if (ids.isEmpty()) {
if (ips.isEmpty()) {
return new AutoScalingData<Instance>(Lists.<String>newArrayList(), Lists.<Instance>newArrayList());
}
DescribeInstancesResult result = amazonEC2Client.describeInstances(
new DescribeInstancesRequest()
.withFilters(
new Filter("private-ip-address", ids)
new Filter("private-ip-address", ips)
)
);
@ -166,7 +166,7 @@ public class EC2AutoScalingStrategy implements AutoScalingStrategy<Instance>
return new AutoScalingData<Instance>(
Lists.transform(
ids,
ips,
new Function<String, String>()
{
@Override
@ -217,4 +217,36 @@ public class EC2AutoScalingStrategy implements AutoScalingStrategy<Instance>
return retVal;
}
@Override
public List<String> idToIpLookup(List<String> nodeIds)
{
DescribeInstancesResult result = amazonEC2Client.describeInstances(
new DescribeInstancesRequest()
.withFilters(
new Filter("instance-id", nodeIds)
)
);
List<Instance> instances = Lists.newArrayList();
for (Reservation reservation : result.getReservations()) {
instances.addAll(reservation.getInstances());
}
List<String> retVal = Lists.transform(
instances,
new Function<Instance, String>()
{
@Override
public String apply(Instance input)
{
return input.getPrivateIpAddress();
}
}
);
log.info("Performing lookup: %s --> %s", nodeIds, retVal);
return retVal;
}
}

View File

@ -38,9 +38,9 @@ public class NoopAutoScalingStrategy implements AutoScalingStrategy<String>
}
@Override
public AutoScalingData<String> terminate(List<String> nodeIds)
public AutoScalingData<String> terminate(List<String> ips)
{
log.info("If I were a real strategy I'd terminate %s now", nodeIds);
log.info("If I were a real strategy I'd terminate %s now", ips);
return null;
}
@ -50,4 +50,11 @@ public class NoopAutoScalingStrategy implements AutoScalingStrategy<String>
log.info("I'm not a real strategy so I'm returning what I got %s", ips);
return ips;
}
@Override
public List<String> idToIpLookup(List<String> nodeIds)
{
log.info("I'm not a real strategy so I'm returning what I got %s", nodeIds);
return nodeIds;
}
}

View File

@ -116,6 +116,8 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat
.addData("provisioningCount", currentlyProvisioning.size())
.emit();
List<String> nodeIps = autoScalingStrategy.idToIpLookup(Lists.newArrayList(currentlyProvisioning));
autoScalingStrategy.terminate(nodeIps);
currentlyProvisioning.clear();
}
}

View File

@ -0,0 +1,40 @@
package com.metamx.druid.merger.worker.http;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.inject.Inject;
import com.metamx.common.logger.Logger;
import com.metamx.druid.merger.common.control.TaskControl;
import javax.ws.rs.Consumes;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.Response;
/**
*/
@Path("/mmx/worker/v1")
public class WorkerResource
{
private static final Logger log = new Logger(WorkerResource.class);
private final ObjectMapper jsonMapper;
@Inject
public WorkerResource(
ObjectMapper jsonMapper
) throws Exception
{
this.jsonMapper = jsonMapper;
}
@POST
@Path("/control")
@Consumes("application/json")
@Produces("application/json")
public Response doControl(final TaskControl control)
{
// TODO
return Response.ok().build();
}
}

View File

@ -32,7 +32,7 @@ public class TestAutoScalingStrategy<T> implements AutoScalingStrategy<T>
}
@Override
public AutoScalingData<T> terminate(List<String> ids)
public AutoScalingData<T> terminate(List<String> ips)
{
return null;
}
@ -42,4 +42,10 @@ public class TestAutoScalingStrategy<T> implements AutoScalingStrategy<T>
{
return null;
}
@Override
public List<String> idToIpLookup(List<String> nodeIds)
{
return null;
}
}

View File

@ -0,0 +1,111 @@
package com.metamx.druid.realtime;
import com.google.common.base.Throwables;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.metamx.common.concurrent.ScheduledExecutors;
import com.metamx.druid.index.v1.IndexGranularity;
import com.metamx.druid.input.InputRow;
import com.metamx.druid.realtime.plumber.IntervalRejectionPolicyFactory;
import com.metamx.druid.realtime.plumber.RejectionPolicy;
import org.joda.time.DateTime;
import org.joda.time.Duration;
import org.joda.time.Interval;
import org.joda.time.Period;
import java.io.IOException;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
/**
*/
public class GracefulShutdownFirehose implements Firehose
{
private final Firehose firehose;
private final IndexGranularity segmentGranularity;
private final long windowMillis;
private final ScheduledExecutorService scheduledExecutor;
private final RejectionPolicy rejectionPolicy;
private volatile boolean shutdown = false;
public GracefulShutdownFirehose(
Firehose firehose,
IndexGranularity segmentGranularity,
Period windowPeriod
)
{
this.firehose = firehose;
this.segmentGranularity = segmentGranularity;
this.windowMillis = windowPeriod.toStandardDuration().getMillis() * 2;
this.scheduledExecutor = Executors.newScheduledThreadPool(
1,
new ThreadFactoryBuilder()
.setDaemon(true)
.setNameFormat("firehose_scheduled_%d")
.build()
);
final long truncatedNow = segmentGranularity.truncate(new DateTime()).getMillis();
final long end = segmentGranularity.increment(truncatedNow);
this.rejectionPolicy = new IntervalRejectionPolicyFactory(new Interval(truncatedNow, end)).create(windowPeriod);
}
public void shutdown() throws IOException
{
final long truncatedNow = segmentGranularity.truncate(new DateTime()).getMillis();
ScheduledExecutors.scheduleWithFixedDelay(
scheduledExecutor,
new Duration(System.currentTimeMillis(), segmentGranularity.increment(truncatedNow) + windowMillis),
new Callable<ScheduledExecutors.Signal>()
{
@Override
public ScheduledExecutors.Signal call() throws Exception
{
try {
firehose.close();
}
catch (Exception e) {
throw Throwables.propagate(e);
}
return ScheduledExecutors.Signal.STOP;
}
}
);
shutdown = true;
}
@Override
public boolean hasMore()
{
return firehose.hasMore();
}
@Override
public InputRow nextRow()
{
InputRow next = firehose.nextRow();
if (!shutdown || rejectionPolicy.accept(next.getTimestampFromEpoch())) {
return next;
}
return null;
}
@Override
public Runnable commit()
{
return firehose.commit();
}
@Override
public void close() throws IOException
{
firehose.close();
}
}

View File

@ -0,0 +1,42 @@
package com.metamx.druid.realtime.plumber;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.joda.time.Period;
/**
*/
public class IntervalRejectionPolicyFactory implements RejectionPolicyFactory
{
private final Interval interval;
public IntervalRejectionPolicyFactory(Interval interval)
{
this.interval = interval;
}
@Override
public RejectionPolicy create(Period windowPeriod)
{
return new RejectionPolicy()
{
@Override
public DateTime getCurrMaxTime()
{
return new DateTime();
}
@Override
public boolean accept(long timestamp)
{
return interval.contains(timestamp);
}
@Override
public String toString()
{
return String.format("interval-%s", interval);
}
};
}
}

View File

@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Function;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.Iterables;
@ -230,7 +231,7 @@ public class RealtimePlumberSchool implements PlumberSchool
final Function<Query<T>, ServiceMetricEvent.Builder> builderFn =
new Function<Query<T>, ServiceMetricEvent.Builder>()
{
private final QueryToolChest<T,Query<T>> toolchest = factory.getToolchest();
private final QueryToolChest<T, Query<T>> toolchest = factory.getToolchest();
@Override
public ServiceMetricEvent.Builder apply(@Nullable Query<T> input)
@ -303,7 +304,32 @@ public class RealtimePlumberSchool implements PlumberSchool
@Override
public void finishJob()
{
stopped = true;
log.info("Shutting down....");
while (!sinks.isEmpty()) {
try {
log.info(
"Cannot shut down yet! Sinks for %s remain!",
Joiner.on(", ").join(
Iterables.transform(
sinks.values(),
new Function<Sink, DataSegment>()
{
@Override
public DataSegment apply(Sink input)
{
return input.getSegment();
}
}
)
)
);
Thread.sleep(60000);
}
catch (InterruptedException e) {
throw Throwables.propagate(e);
}
}
for (final Sink sink : sinks.values()) {
try {
@ -322,6 +348,8 @@ public class RealtimePlumberSchool implements PlumberSchool
if (scheduledExecutor != null) {
scheduledExecutor.shutdown();
}
stopped = true;
}
private void initializeExecutors()