From 88661b26a04dd89fb914dc3f780d592576d0d634 Mon Sep 17 00:00:00 2001 From: fjy Date: Wed, 21 Aug 2013 11:14:54 -0700 Subject: [PATCH] bug fix for RTR removing workers race condition and partition chunks not being sorted by chunk number --- .../partition/IntegerPartitionChunk.java | 7 +---- .../druid/partition/LinearPartitionChunk.java | 8 +++++- .../druid/partition/StringPartitionChunk.java | 9 ++----- .../partition/IntegerPartitionChunkTest.java | 12 ++++----- .../partition/StringPartitionChunkTest.java | 14 +++++----- .../coordinator/RemoteTaskRunner.java | 16 +++++++++--- .../coordinator/RemoteTaskRunnerWorkItem.java | 19 ++++++++++++-- .../coordinator/RemoteTaskRunnerTest.java | 26 +++++++++++++++++++ 8 files changed, 79 insertions(+), 32 deletions(-) diff --git a/common/src/main/java/com/metamx/druid/partition/IntegerPartitionChunk.java b/common/src/main/java/com/metamx/druid/partition/IntegerPartitionChunk.java index a49313711ec..a2c511d1f99 100644 --- a/common/src/main/java/com/metamx/druid/partition/IntegerPartitionChunk.java +++ b/common/src/main/java/com/metamx/druid/partition/IntegerPartitionChunk.java @@ -97,12 +97,7 @@ public class IntegerPartitionChunk implements PartitionChunk if (chunk instanceof IntegerPartitionChunk) { IntegerPartitionChunk intChunk = (IntegerPartitionChunk) chunk; - int retVal = comparator.compare(start, intChunk.start); - if (retVal == 0) { - retVal = comparator.compare(end, intChunk.end); - } - - return retVal; + return comparator.compare(chunkNumber, intChunk.chunkNumber); } throw new IllegalArgumentException("Cannot compare against something that is not a StringPartitionChunk."); } diff --git a/common/src/main/java/com/metamx/druid/partition/LinearPartitionChunk.java b/common/src/main/java/com/metamx/druid/partition/LinearPartitionChunk.java index 2f9cee1fe8a..cbc299f820e 100644 --- a/common/src/main/java/com/metamx/druid/partition/LinearPartitionChunk.java +++ b/common/src/main/java/com/metamx/druid/partition/LinearPartitionChunk.java @@ -1,7 +1,13 @@ package com.metamx.druid.partition; +import com.google.common.collect.Ordering; + +import java.util.Comparator; + public class LinearPartitionChunk implements PartitionChunk { + Comparator comparator = Ordering.natural().nullsFirst(); + private final int chunkNumber; private final T object; @@ -56,7 +62,7 @@ public class LinearPartitionChunk implements PartitionChunk if (chunk instanceof LinearPartitionChunk) { LinearPartitionChunk linearChunk = (LinearPartitionChunk) chunk; - return chunkNumber - chunk.getChunkNumber(); + return comparator.compare(chunkNumber, linearChunk.chunkNumber); } throw new IllegalArgumentException("Cannot compare against something that is not a LinearPartitionChunk."); } diff --git a/common/src/main/java/com/metamx/druid/partition/StringPartitionChunk.java b/common/src/main/java/com/metamx/druid/partition/StringPartitionChunk.java index 39d0e71ae76..54b067faf7d 100644 --- a/common/src/main/java/com/metamx/druid/partition/StringPartitionChunk.java +++ b/common/src/main/java/com/metamx/druid/partition/StringPartitionChunk.java @@ -27,7 +27,7 @@ import java.util.Comparator; */ public class StringPartitionChunk implements PartitionChunk { - private static final Comparator comparator = Ordering.natural().nullsFirst(); + private static final Comparator comparator = Ordering.natural().nullsFirst(); private final String start; private final String end; @@ -95,12 +95,7 @@ public class StringPartitionChunk implements PartitionChunk if (chunk instanceof StringPartitionChunk) { StringPartitionChunk stringChunk = (StringPartitionChunk) chunk; - int retVal = comparator.compare(start, stringChunk.start); - if (retVal == 0) { - retVal = comparator.compare(end, stringChunk.end); - } - - return retVal; + return comparator.compare(chunkNumber, stringChunk.chunkNumber); } throw new IllegalArgumentException("Cannot compare against something that is not a StringPartitionChunk."); } diff --git a/common/src/test/java/com/metamx/druid/partition/IntegerPartitionChunkTest.java b/common/src/test/java/com/metamx/druid/partition/IntegerPartitionChunkTest.java index 0acb92911f5..f408f2b04cd 100644 --- a/common/src/test/java/com/metamx/druid/partition/IntegerPartitionChunkTest.java +++ b/common/src/test/java/com/metamx/druid/partition/IntegerPartitionChunkTest.java @@ -62,13 +62,13 @@ public class IntegerPartitionChunkTest public void testCompareTo() throws Exception { Assert.assertEquals(0, make(null, null, 0, 1).compareTo(make(null, null, 0, 1))); - Assert.assertEquals(0, make(10, null, 0, 1).compareTo(make(10, null, 1, 2))); - Assert.assertEquals(0, make(null, 10, 0, 1).compareTo(make(null, 10, 1, 2))); - Assert.assertEquals(0, make(10, 11, 0, 1).compareTo(make(10, 11, 1, 2))); + Assert.assertEquals(0, make(10, null, 0, 1).compareTo(make(10, null, 0, 2))); + Assert.assertEquals(0, make(null, 10, 0, 1).compareTo(make(null, 10, 0, 2))); + Assert.assertEquals(0, make(10, 11, 0, 1).compareTo(make(10, 11, 0, 2))); Assert.assertEquals(-1, make(null, 10, 0, 1).compareTo(make(10, null, 1, 2))); - Assert.assertEquals(-1, make(11, 20, 0, 1).compareTo(make(20, 33, 0, 1))); - Assert.assertEquals(1, make(20, 33, 0, 1).compareTo(make(11, 20, 0, 1))); - Assert.assertEquals(1, make(10, null, 0, 1).compareTo(make(null, 10, 0, 1))); + Assert.assertEquals(-1, make(11, 20, 0, 1).compareTo(make(20, 33, 1, 1))); + Assert.assertEquals(1, make(20, 33, 1, 1).compareTo(make(11, 20, 0, 1))); + Assert.assertEquals(1, make(10, null, 1, 1).compareTo(make(null, 10, 0, 1))); } @Test diff --git a/common/src/test/java/com/metamx/druid/partition/StringPartitionChunkTest.java b/common/src/test/java/com/metamx/druid/partition/StringPartitionChunkTest.java index c6a7cdfd005..8a6cadf1743 100644 --- a/common/src/test/java/com/metamx/druid/partition/StringPartitionChunkTest.java +++ b/common/src/test/java/com/metamx/druid/partition/StringPartitionChunkTest.java @@ -61,14 +61,14 @@ public class StringPartitionChunkTest @Test public void testCompareTo() throws Exception { - Assert.assertEquals(0, make(null, null, 0, 1).compareTo(make(null, null, 1, 2))); - Assert.assertEquals(0, make("10", null, 0, 1).compareTo(make("10", null, 1, 2))); - Assert.assertEquals(0, make(null, "10", 0, 1).compareTo(make(null, "10", 1, 2))); - Assert.assertEquals(0, make("10", "11", 0, 1).compareTo(make("10", "11", 1, 2))); + Assert.assertEquals(0, make(null, null, 0, 1).compareTo(make(null, null, 0, 2))); + Assert.assertEquals(0, make("10", null, 0, 1).compareTo(make("10", null, 0, 2))); + Assert.assertEquals(0, make(null, "10", 1, 1).compareTo(make(null, "10", 1, 2))); + Assert.assertEquals(0, make("10", "11", 1, 1).compareTo(make("10", "11", 1, 2))); Assert.assertEquals(-1, make(null, "10", 0, 1).compareTo(make("10", null, 1, 2))); - Assert.assertEquals(-1, make("11", "20", 0, 1).compareTo(make("20", "33", 0, 1))); - Assert.assertEquals(1, make("20", "33", 0, 1).compareTo(make("11", "20", 0, 1))); - Assert.assertEquals(1, make("10", null, 0, 1).compareTo(make(null, "10", 0, 1))); + Assert.assertEquals(-1, make("11", "20", 0, 1).compareTo(make("20", "33", 1, 1))); + Assert.assertEquals(1, make("20", "33", 1, 1).compareTo(make("11", "20", 0, 1))); + Assert.assertEquals(1, make("10", null, 1, 1).compareTo(make(null, "10", 0, 1))); } @Test diff --git a/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/RemoteTaskRunner.java b/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/RemoteTaskRunner.java index 94e1ff80bd7..e5bfc3a01ee 100644 --- a/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/RemoteTaskRunner.java +++ b/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/RemoteTaskRunner.java @@ -521,7 +521,7 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider ); } - runningTasks.put(task.getId(), pendingTasks.remove(task.getId())); + runningTasks.put(task.getId(), pendingTasks.remove(task.getId()).withWorker(theWorker)); log.info("Task %s switched from pending to running", task.getId()); // Syncing state with Zookeeper - don't assign new tasks until the task we just assigned is actually running @@ -615,6 +615,8 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider if (taskRunnerWorkItem != null) { log.info("Task %s just disappeared!", taskId); taskRunnerWorkItem.setResult(TaskStatus.failure(taskRunnerWorkItem.getTask().getId())); + } else { + log.warn("Task %s just disappeared but I didn't know about it?!", taskId); } break; } @@ -653,13 +655,21 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider { log.info("Kaboom! Worker[%s] removed!", worker.getHost()); - ZkWorker zkWorker = zkWorkers.get(worker.getHost()); + final ZkWorker zkWorker = zkWorkers.get(worker.getHost()); if (zkWorker != null) { try { List tasksToFail = Lists.newArrayList( cf.getChildren().forPath(JOINER.join(config.getIndexerTaskPath(), worker.getHost())) ); - tasksToFail.addAll(zkWorker.getRunningTaskIds()); + log.info("%s: Found %d tasks assigned", worker.getHost(), tasksToFail.size()); + + for (Map.Entry entry : runningTasks.entrySet()) { + if (entry.getValue().getWorker().getHost().equalsIgnoreCase(worker.getHost())) { + log.info("%s: Found %s running", worker.getHost(), entry.getKey()); + tasksToFail.add(entry.getKey()); + } + } + for (String assignedTask : tasksToFail) { RemoteTaskRunnerWorkItem taskRunnerWorkItem = runningTasks.remove(assignedTask); if (taskRunnerWorkItem != null) { diff --git a/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/RemoteTaskRunnerWorkItem.java b/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/RemoteTaskRunnerWorkItem.java index 72cb7155af8..be60c758ab6 100644 --- a/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/RemoteTaskRunnerWorkItem.java +++ b/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/RemoteTaskRunnerWorkItem.java @@ -22,6 +22,7 @@ package com.metamx.druid.indexing.coordinator; import com.google.common.util.concurrent.SettableFuture; import com.metamx.druid.indexing.common.TaskStatus; import com.metamx.druid.indexing.common.task.Task; +import com.metamx.druid.indexing.worker.Worker; import org.joda.time.DateTime; /** @@ -29,6 +30,7 @@ import org.joda.time.DateTime; public class RemoteTaskRunnerWorkItem extends TaskRunnerWorkItem { private final SettableFuture result; + private final Worker worker; public RemoteTaskRunnerWorkItem( Task task, @@ -37,17 +39,25 @@ public class RemoteTaskRunnerWorkItem extends TaskRunnerWorkItem { super(task, result); this.result = result; + this.worker = null; } public RemoteTaskRunnerWorkItem( Task task, SettableFuture result, DateTime createdTime, - DateTime queueInsertionTime + DateTime queueInsertionTime, + Worker worker ) { super(task, result, createdTime, queueInsertionTime); this.result = result; + this.worker = worker; + } + + public Worker getWorker() + { + return worker; } public void setResult(TaskStatus status) @@ -58,6 +68,11 @@ public class RemoteTaskRunnerWorkItem extends TaskRunnerWorkItem @Override public RemoteTaskRunnerWorkItem withQueueInsertionTime(DateTime time) { - return new RemoteTaskRunnerWorkItem(getTask(), result, getCreatedTime(), time); + return new RemoteTaskRunnerWorkItem(getTask(), result, getCreatedTime(), time, worker); + } + + public RemoteTaskRunnerWorkItem withWorker(Worker worker) + { + return new RemoteTaskRunnerWorkItem(getTask(), result, getCreatedTime(), getQueueInsertionTime(), worker); } } diff --git a/indexing-service/src/test/java/com/metamx/druid/indexing/coordinator/RemoteTaskRunnerTest.java b/indexing-service/src/test/java/com/metamx/druid/indexing/coordinator/RemoteTaskRunnerTest.java index 492251dfc6f..a22664ec77b 100644 --- a/indexing-service/src/test/java/com/metamx/druid/indexing/coordinator/RemoteTaskRunnerTest.java +++ b/indexing-service/src/test/java/com/metamx/druid/indexing/coordinator/RemoteTaskRunnerTest.java @@ -50,10 +50,13 @@ import java.io.File; import java.util.Arrays; import java.util.Set; import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; /** + * Several of the tests here are integration tests rather than unit tests. We will introduce real unit tests for this + * class as well as integration tests in the very near future. */ public class RemoteTaskRunnerTest { @@ -277,6 +280,29 @@ public class RemoteTaskRunnerTest Assert.assertEquals(TaskStatus.Status.SUCCESS, status.getStatusCode()); } + @Test + public void testWorkerRemoved() throws Exception + { + doSetup(); + remoteTaskRunner.bootstrap(Lists.newArrayList()); + Future future = remoteTaskRunner.run(makeTask(TaskStatus.running("task"))); + + Stopwatch stopwatch = new Stopwatch(); + stopwatch.start(); + while (cf.checkExists().forPath(joiner.join(statusPath, "task")) == null) { + Thread.sleep(100); + if (stopwatch.elapsed(TimeUnit.MILLISECONDS) > 1000) { + throw new ISE("Cannot find running task"); + } + } + + workerCuratorCoordinator.stop(); + + TaskStatus status = future.get(); + + Assert.assertEquals(TaskStatus.Status.FAILED, status.getStatusCode()); + } + private void doSetup() throws Exception { makeWorker();