make RTR idempotent to multiple run requests for same task, because higher level things in the indexing service require this behaviour

This commit is contained in:
fjy 2013-08-05 14:44:01 -07:00
parent fbb1211cbc
commit 35f89d7232
3 changed files with 24 additions and 17 deletions

View File

@ -265,9 +265,6 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider
task.getId(),
new RemoteTaskRunnerWorkItem(task, SettableFuture.<TaskStatus>create())
);
} else {
log.info("Bootstrap didn't find %s running. Running it again", task.getId());
run(task);
}
}
}
@ -284,8 +281,15 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider
@Override
public ListenableFuture<TaskStatus> run(final Task task)
{
if (runningTasks.containsKey(task.getId()) || pendingTasks.containsKey(task.getId())) {
throw new ISE("Assigned a task[%s] that is already running or pending, WTF is happening?!", task.getId());
RemoteTaskRunnerWorkItem runningTask = runningTasks.get(task.getId());
if (runningTask != null) {
log.info("Assigned a task[%s] that is already running, not doing anything", task.getId());
return runningTask.getResult();
}
RemoteTaskRunnerWorkItem pendingTask = pendingTasks.get(task.getId());
if (pendingTask != null) {
log.info("Assigned a task[%s] that is already pending, not doing anything", task.getId());
return pendingTask.getResult();
}
RemoteTaskRunnerWorkItem taskRunnerWorkItem = new RemoteTaskRunnerWorkItem(
task,
@ -686,9 +690,12 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider
}
);
sortedWorkers.addAll(zkWorkers.values());
final String configMinWorkerVer = workerSetupData.get().getMinVersion();
final String minWorkerVer = configMinWorkerVer == null ? config.getWorkerVersion() : configMinWorkerVer;
for (ZkWorker zkWorker : sortedWorkers) {
if (zkWorker.canRunTask(task) &&
zkWorker.getWorker().getVersion().compareTo(workerSetupData.get().getMinVersion()) >= 0) {
zkWorker.getWorker().getVersion().compareTo(minWorkerVer) >= 0) {
return zkWorker;
}
}

View File

@ -23,6 +23,7 @@ import com.metamx.druid.indexing.common.config.IndexerZkConfig;
import org.joda.time.Duration;
import org.skife.config.Config;
import org.skife.config.Default;
import org.skife.config.DefaultNull;
/**
*/
@ -35,4 +36,8 @@ public abstract class RemoteTaskRunnerConfig extends IndexerZkConfig
@Config("druid.curator.compression.enable")
@Default("false")
public abstract boolean enableCompression();
@Config("druid.indexer.worker.version")
@DefaultNull
public abstract String getWorkerVersion();
}

View File

@ -109,17 +109,6 @@ public class RemoteTaskRunnerTest
remoteTaskRunner.run(task);
}
@Test(expected = ISE.class)
public void testExceptionThrownWithExistingTask() throws Exception
{
doSetup();
remoteTaskRunner.run(makeTask(TaskStatus.running("task")));
remoteTaskRunner.run(
makeTask(TaskStatus.running("task"))
);
}
@Test
public void testRunTooMuchZKData() throws Exception
{
@ -415,5 +404,11 @@ public class RemoteTaskRunnerTest
{
return 1000;
}
@Override
public String getWorkerVersion()
{
return "";
}
}
}