From e950c6333dcbf187fa6a70341e0f0c6156758ddc Mon Sep 17 00:00:00 2001 From: Richard Benkovsky Date: Thu, 29 Nov 2012 09:42:28 +0100 Subject: [PATCH] RemoteTaskRunner#run should fail on task with taskId from tasks.keys set --- .../merger/coordinator/RemoteTaskRunner.java | 7 ++++--- .../merger/coordinator/RemoteTaskRunnerTest.java | 15 +++++++++++++++ 2 files changed, 19 insertions(+), 3 deletions(-) diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/RemoteTaskRunner.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/RemoteTaskRunner.java index 18225ec4a38..15d7db56084 100644 --- a/merger/src/main/java/com/metamx/druid/merger/coordinator/RemoteTaskRunner.java +++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/RemoteTaskRunner.java @@ -54,6 +54,7 @@ import org.joda.time.Period; import javax.annotation.Nullable; import java.util.Comparator; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentSkipListSet; @@ -88,9 +89,9 @@ public class RemoteTaskRunner implements TaskRunner private final ScalingStrategy strategy; // all workers that exist in ZK - private final ConcurrentHashMap zkWorkers = new ConcurrentHashMap(); + private final Map zkWorkers = new ConcurrentHashMap(); // all tasks that are assigned or need to be assigned - private final ConcurrentHashMap tasks = new ConcurrentHashMap(); + private final Map tasks = new ConcurrentHashMap(); private final ConcurrentSkipListSet currentlyProvisioning = new ConcurrentSkipListSet(); private final ConcurrentSkipListSet currentlyTerminating = new ConcurrentSkipListSet(); @@ -259,7 +260,7 @@ public class RemoteTaskRunner implements TaskRunner @Override public void run(Task task, TaskContext context, TaskCallback callback) { - if (tasks.contains(task.getId())) { + if (tasks.containsKey(task.getId())) { throw new ISE("Assigned a task[%s] that already exists, WTF is happening?!", task.getId()); } TaskWrapper taskWrapper = new TaskWrapper( diff --git a/merger/src/test/java/com/metamx/druid/merger/coordinator/RemoteTaskRunnerTest.java b/merger/src/test/java/com/metamx/druid/merger/coordinator/RemoteTaskRunnerTest.java index aca8eef5114..eb10731abd9 100644 --- a/merger/src/test/java/com/metamx/druid/merger/coordinator/RemoteTaskRunnerTest.java +++ b/merger/src/test/java/com/metamx/druid/merger/coordinator/RemoteTaskRunnerTest.java @@ -3,6 +3,7 @@ package com.metamx.druid.merger.coordinator; import com.google.common.base.Throwables; import com.google.common.collect.Lists; import com.google.common.collect.Sets; +import com.metamx.common.ISE; import com.metamx.druid.aggregation.AggregatorFactory; import com.metamx.druid.client.DataSegment; import com.metamx.druid.jackson.DefaultObjectMapper; @@ -44,6 +45,8 @@ import java.util.List; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; +import static junit.framework.Assert.fail; + /** */ public class RemoteTaskRunnerTest @@ -133,6 +136,18 @@ public class RemoteTaskRunnerTest ); } + @Test + public void testAlreadyExecutedTask() throws Exception + { + remoteTaskRunner.run(task1, new TaskContext(new DateTime().toString(), Sets.newHashSet()), null); + try { + remoteTaskRunner.run(task1, new TaskContext(new DateTime().toString(), Sets.newHashSet()), null); + fail("ISE expected"); + } catch (ISE expected) { + + } + } + @Test public void testRunTooMuchZKData() throws Exception {