Merge pull request #37 from Benky/master

RemoteTaskRunner#run should fail on task with taskId from tasks.keys set
This commit is contained in:
cheddar 2012-11-29 09:21:41 -08:00
commit 25a4caac62
2 changed files with 19 additions and 3 deletions

View File

@ -54,6 +54,7 @@ import org.joda.time.Period;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import java.util.Comparator; import java.util.Comparator;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.ConcurrentSkipListSet;
@ -88,9 +89,9 @@ public class RemoteTaskRunner implements TaskRunner
private final ScalingStrategy strategy; private final ScalingStrategy strategy;
// all workers that exist in ZK // all workers that exist in ZK
private final ConcurrentHashMap<String, WorkerWrapper> zkWorkers = new ConcurrentHashMap<String, WorkerWrapper>(); private final Map<String, WorkerWrapper> zkWorkers = new ConcurrentHashMap<String, WorkerWrapper>();
// all tasks that are assigned or need to be assigned // all tasks that are assigned or need to be assigned
private final ConcurrentHashMap<String, TaskWrapper> tasks = new ConcurrentHashMap<String, TaskWrapper>(); private final Map<String, TaskWrapper> tasks = new ConcurrentHashMap<String, TaskWrapper>();
private final ConcurrentSkipListSet<String> currentlyProvisioning = new ConcurrentSkipListSet<String>(); private final ConcurrentSkipListSet<String> currentlyProvisioning = new ConcurrentSkipListSet<String>();
private final ConcurrentSkipListSet<String> currentlyTerminating = new ConcurrentSkipListSet<String>(); private final ConcurrentSkipListSet<String> currentlyTerminating = new ConcurrentSkipListSet<String>();
@ -259,7 +260,7 @@ public class RemoteTaskRunner implements TaskRunner
@Override @Override
public void run(Task task, TaskContext context, TaskCallback callback) public void run(Task task, TaskContext context, TaskCallback callback)
{ {
if (tasks.contains(task.getId())) { if (tasks.containsKey(task.getId())) {
throw new ISE("Assigned a task[%s] that already exists, WTF is happening?!", task.getId()); throw new ISE("Assigned a task[%s] that already exists, WTF is happening?!", task.getId());
} }
TaskWrapper taskWrapper = new TaskWrapper( TaskWrapper taskWrapper = new TaskWrapper(

View File

@ -3,6 +3,7 @@ package com.metamx.druid.merger.coordinator;
import com.google.common.base.Throwables; import com.google.common.base.Throwables;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
import com.metamx.common.ISE;
import com.metamx.druid.aggregation.AggregatorFactory; import com.metamx.druid.aggregation.AggregatorFactory;
import com.metamx.druid.client.DataSegment; import com.metamx.druid.client.DataSegment;
import com.metamx.druid.jackson.DefaultObjectMapper; import com.metamx.druid.jackson.DefaultObjectMapper;
@ -44,6 +45,8 @@ import java.util.List;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import static junit.framework.Assert.fail;
/** /**
*/ */
public class RemoteTaskRunnerTest public class RemoteTaskRunnerTest
@ -133,6 +136,18 @@ public class RemoteTaskRunnerTest
); );
} }
@Test
public void testAlreadyExecutedTask() throws Exception
{
remoteTaskRunner.run(task1, new TaskContext(new DateTime().toString(), Sets.<DataSegment>newHashSet()), null);
try {
remoteTaskRunner.run(task1, new TaskContext(new DateTime().toString(), Sets.<DataSegment>newHashSet()), null);
fail("ISE expected");
} catch (ISE expected) {
}
}
@Test @Test
public void testRunTooMuchZKData() throws Exception public void testRunTooMuchZKData() throws Exception
{ {