more sensical logic around when to terminate worker nodes

This commit is contained in:
Fangjin Yang 2012-11-02 16:59:49 -07:00
parent a5c3eab2f1
commit e9dadcae49
2 changed files with 21 additions and 13 deletions

View File

@ -144,17 +144,17 @@ public class RemoteTaskRunner implements TaskRunner
);
// Schedule termination of worker nodes periodically
Period period = new Period(config.getTerminateResourcesPeriodMs());
PeriodGranularity granularity = new PeriodGranularity(period, null, null);
final long truncatedNow = granularity.truncate(new DateTime().getMillis());
Period period = new Period(config.getTerminateResourcesDuration());
PeriodGranularity granularity = new PeriodGranularity(period, config.getTerminateResourcesOriginDateTime(), null);
final long startTime = granularity.next(granularity.truncate(new DateTime().getMillis()));
ScheduledExecutors.scheduleAtFixedRate(
scheduledExec,
new Duration(
System.currentTimeMillis(),
granularity.next(truncatedNow) - config.getTerminateResourcesWindowMs()
startTime
),
new Duration(config.getTerminateResourcesPeriodMs()),
config.getTerminateResourcesDuration(),
new Runnable()
{
@Override
@ -588,12 +588,18 @@ public class RemoteTaskRunner implements TaskRunner
{
try {
final String statusPath = JOINER.join(config.getStatusPath(), workerId, taskId);
final String taskPath = JOINER.join(config.getTaskPath(), workerId, taskId);
cf.delete().guaranteed().forPath(statusPath);
}
catch (Exception e) {
log.warn("Tried to delete a status path that didn't exist! Must've gone away already?");
}
try {
final String taskPath = JOINER.join(config.getTaskPath(), workerId, taskId);
cf.delete().guaranteed().forPath(taskPath);
}
catch (Exception e) {
log.warn("Tried to delete a path that didn't exist! Must've gone away already!");
log.warn("Tried to delete a task path that didn't exist! Must've gone away already?");
}
}
}

View File

@ -20,6 +20,8 @@
package com.metamx.druid.merger.coordinator.config;
import com.metamx.druid.merger.common.config.IndexerZkConfig;
import org.joda.time.DateTime;
import org.joda.time.Duration;
import org.skife.config.Config;
import org.skife.config.Default;
@ -27,13 +29,13 @@ import org.skife.config.Default;
*/
public abstract class RemoteTaskRunnerConfig extends IndexerZkConfig
{
@Config("druid.indexer.terminateResources.periodMs")
@Default("3600000") // 1 hr
public abstract long getTerminateResourcesPeriodMs();
@Config("druid.indexer.terminateResources.duration")
@Default("PT1H")
public abstract Duration getTerminateResourcesDuration();
@Config("druid.indexer.terminateResources.windowMs")
@Default("300000") // 5 mins
public abstract long getTerminateResourcesWindowMs();
@Config("druid.indexer.terminateResources.originDateTime")
@Default("2012-01-01T00:55:00.000Z")
public abstract DateTime getTerminateResourcesOriginDateTime();
@Config("druid.indexer.minWorkerVersion")
public abstract String getMinWorkerVersion();