bug fix for RTR removing workers race condition and partition chunks not being sorted by chunk number

This commit is contained in:
fjy 2013-08-21 11:14:54 -07:00
parent ed666d9d5f
commit 88661b26a0
8 changed files with 79 additions and 32 deletions

View File

@ -97,12 +97,7 @@ public class IntegerPartitionChunk<T> implements PartitionChunk<T>
if (chunk instanceof IntegerPartitionChunk) {
IntegerPartitionChunk<T> intChunk = (IntegerPartitionChunk<T>) chunk;
int retVal = comparator.compare(start, intChunk.start);
if (retVal == 0) {
retVal = comparator.compare(end, intChunk.end);
}
return retVal;
return comparator.compare(chunkNumber, intChunk.chunkNumber);
}
throw new IllegalArgumentException("Cannot compare against something that is not a StringPartitionChunk.");
}

View File

@ -1,7 +1,13 @@
package com.metamx.druid.partition;
import com.google.common.collect.Ordering;
import java.util.Comparator;
public class LinearPartitionChunk <T> implements PartitionChunk<T>
{
Comparator<Integer> comparator = Ordering.<Integer>natural().nullsFirst();
private final int chunkNumber;
private final T object;
@ -56,7 +62,7 @@ public class LinearPartitionChunk <T> implements PartitionChunk<T>
if (chunk instanceof LinearPartitionChunk) {
LinearPartitionChunk<T> linearChunk = (LinearPartitionChunk<T>) chunk;
return chunkNumber - chunk.getChunkNumber();
return comparator.compare(chunkNumber, linearChunk.chunkNumber);
}
throw new IllegalArgumentException("Cannot compare against something that is not a LinearPartitionChunk.");
}

View File

@ -27,7 +27,7 @@ import java.util.Comparator;
*/
public class StringPartitionChunk<T> implements PartitionChunk<T>
{
private static final Comparator<String> comparator = Ordering.<String>natural().nullsFirst();
private static final Comparator<Integer> comparator = Ordering.<Integer>natural().nullsFirst();
private final String start;
private final String end;
@ -95,12 +95,7 @@ public class StringPartitionChunk<T> implements PartitionChunk<T>
if (chunk instanceof StringPartitionChunk) {
StringPartitionChunk<T> stringChunk = (StringPartitionChunk<T>) chunk;
int retVal = comparator.compare(start, stringChunk.start);
if (retVal == 0) {
retVal = comparator.compare(end, stringChunk.end);
}
return retVal;
return comparator.compare(chunkNumber, stringChunk.chunkNumber);
}
throw new IllegalArgumentException("Cannot compare against something that is not a StringPartitionChunk.");
}

View File

@ -62,13 +62,13 @@ public class IntegerPartitionChunkTest
public void testCompareTo() throws Exception
{
Assert.assertEquals(0, make(null, null, 0, 1).compareTo(make(null, null, 0, 1)));
Assert.assertEquals(0, make(10, null, 0, 1).compareTo(make(10, null, 1, 2)));
Assert.assertEquals(0, make(null, 10, 0, 1).compareTo(make(null, 10, 1, 2)));
Assert.assertEquals(0, make(10, 11, 0, 1).compareTo(make(10, 11, 1, 2)));
Assert.assertEquals(0, make(10, null, 0, 1).compareTo(make(10, null, 0, 2)));
Assert.assertEquals(0, make(null, 10, 0, 1).compareTo(make(null, 10, 0, 2)));
Assert.assertEquals(0, make(10, 11, 0, 1).compareTo(make(10, 11, 0, 2)));
Assert.assertEquals(-1, make(null, 10, 0, 1).compareTo(make(10, null, 1, 2)));
Assert.assertEquals(-1, make(11, 20, 0, 1).compareTo(make(20, 33, 0, 1)));
Assert.assertEquals(1, make(20, 33, 0, 1).compareTo(make(11, 20, 0, 1)));
Assert.assertEquals(1, make(10, null, 0, 1).compareTo(make(null, 10, 0, 1)));
Assert.assertEquals(-1, make(11, 20, 0, 1).compareTo(make(20, 33, 1, 1)));
Assert.assertEquals(1, make(20, 33, 1, 1).compareTo(make(11, 20, 0, 1)));
Assert.assertEquals(1, make(10, null, 1, 1).compareTo(make(null, 10, 0, 1)));
}
@Test

View File

@ -61,14 +61,14 @@ public class StringPartitionChunkTest
@Test
public void testCompareTo() throws Exception
{
Assert.assertEquals(0, make(null, null, 0, 1).compareTo(make(null, null, 1, 2)));
Assert.assertEquals(0, make("10", null, 0, 1).compareTo(make("10", null, 1, 2)));
Assert.assertEquals(0, make(null, "10", 0, 1).compareTo(make(null, "10", 1, 2)));
Assert.assertEquals(0, make("10", "11", 0, 1).compareTo(make("10", "11", 1, 2)));
Assert.assertEquals(0, make(null, null, 0, 1).compareTo(make(null, null, 0, 2)));
Assert.assertEquals(0, make("10", null, 0, 1).compareTo(make("10", null, 0, 2)));
Assert.assertEquals(0, make(null, "10", 1, 1).compareTo(make(null, "10", 1, 2)));
Assert.assertEquals(0, make("10", "11", 1, 1).compareTo(make("10", "11", 1, 2)));
Assert.assertEquals(-1, make(null, "10", 0, 1).compareTo(make("10", null, 1, 2)));
Assert.assertEquals(-1, make("11", "20", 0, 1).compareTo(make("20", "33", 0, 1)));
Assert.assertEquals(1, make("20", "33", 0, 1).compareTo(make("11", "20", 0, 1)));
Assert.assertEquals(1, make("10", null, 0, 1).compareTo(make(null, "10", 0, 1)));
Assert.assertEquals(-1, make("11", "20", 0, 1).compareTo(make("20", "33", 1, 1)));
Assert.assertEquals(1, make("20", "33", 1, 1).compareTo(make("11", "20", 0, 1)));
Assert.assertEquals(1, make("10", null, 1, 1).compareTo(make(null, "10", 0, 1)));
}
@Test

View File

@ -521,7 +521,7 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider
);
}
runningTasks.put(task.getId(), pendingTasks.remove(task.getId()));
runningTasks.put(task.getId(), pendingTasks.remove(task.getId()).withWorker(theWorker));
log.info("Task %s switched from pending to running", task.getId());
// Syncing state with Zookeeper - don't assign new tasks until the task we just assigned is actually running
@ -615,6 +615,8 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider
if (taskRunnerWorkItem != null) {
log.info("Task %s just disappeared!", taskId);
taskRunnerWorkItem.setResult(TaskStatus.failure(taskRunnerWorkItem.getTask().getId()));
} else {
log.warn("Task %s just disappeared but I didn't know about it?!", taskId);
}
break;
}
@ -653,13 +655,21 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider
{
log.info("Kaboom! Worker[%s] removed!", worker.getHost());
ZkWorker zkWorker = zkWorkers.get(worker.getHost());
final ZkWorker zkWorker = zkWorkers.get(worker.getHost());
if (zkWorker != null) {
try {
List<String> tasksToFail = Lists.newArrayList(
cf.getChildren().forPath(JOINER.join(config.getIndexerTaskPath(), worker.getHost()))
);
tasksToFail.addAll(zkWorker.getRunningTaskIds());
log.info("%s: Found %d tasks assigned", worker.getHost(), tasksToFail.size());
for (Map.Entry<String, RemoteTaskRunnerWorkItem> entry : runningTasks.entrySet()) {
if (entry.getValue().getWorker().getHost().equalsIgnoreCase(worker.getHost())) {
log.info("%s: Found %s running", worker.getHost(), entry.getKey());
tasksToFail.add(entry.getKey());
}
}
for (String assignedTask : tasksToFail) {
RemoteTaskRunnerWorkItem taskRunnerWorkItem = runningTasks.remove(assignedTask);
if (taskRunnerWorkItem != null) {

View File

@ -22,6 +22,7 @@ package com.metamx.druid.indexing.coordinator;
import com.google.common.util.concurrent.SettableFuture;
import com.metamx.druid.indexing.common.TaskStatus;
import com.metamx.druid.indexing.common.task.Task;
import com.metamx.druid.indexing.worker.Worker;
import org.joda.time.DateTime;
/**
@ -29,6 +30,7 @@ import org.joda.time.DateTime;
public class RemoteTaskRunnerWorkItem extends TaskRunnerWorkItem
{
private final SettableFuture<TaskStatus> result;
private final Worker worker;
public RemoteTaskRunnerWorkItem(
Task task,
@ -37,17 +39,25 @@ public class RemoteTaskRunnerWorkItem extends TaskRunnerWorkItem
{
super(task, result);
this.result = result;
this.worker = null;
}
public RemoteTaskRunnerWorkItem(
Task task,
SettableFuture<TaskStatus> result,
DateTime createdTime,
DateTime queueInsertionTime
DateTime queueInsertionTime,
Worker worker
)
{
super(task, result, createdTime, queueInsertionTime);
this.result = result;
this.worker = worker;
}
public Worker getWorker()
{
return worker;
}
public void setResult(TaskStatus status)
@ -58,6 +68,11 @@ public class RemoteTaskRunnerWorkItem extends TaskRunnerWorkItem
@Override
public RemoteTaskRunnerWorkItem withQueueInsertionTime(DateTime time)
{
return new RemoteTaskRunnerWorkItem(getTask(), result, getCreatedTime(), time);
return new RemoteTaskRunnerWorkItem(getTask(), result, getCreatedTime(), time, worker);
}
public RemoteTaskRunnerWorkItem withWorker(Worker worker)
{
return new RemoteTaskRunnerWorkItem(getTask(), result, getCreatedTime(), getQueueInsertionTime(), worker);
}
}

View File

@ -50,10 +50,13 @@ import java.io.File;
import java.util.Arrays;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
/**
* Several of the tests here are integration tests rather than unit tests. We will introduce real unit tests for this
* class as well as integration tests in the very near future.
*/
public class RemoteTaskRunnerTest
{
@ -277,6 +280,29 @@ public class RemoteTaskRunnerTest
Assert.assertEquals(TaskStatus.Status.SUCCESS, status.getStatusCode());
}
@Test
public void testWorkerRemoved() throws Exception
{
doSetup();
remoteTaskRunner.bootstrap(Lists.<Task>newArrayList());
Future<TaskStatus> future = remoteTaskRunner.run(makeTask(TaskStatus.running("task")));
Stopwatch stopwatch = new Stopwatch();
stopwatch.start();
while (cf.checkExists().forPath(joiner.join(statusPath, "task")) == null) {
Thread.sleep(100);
if (stopwatch.elapsed(TimeUnit.MILLISECONDS) > 1000) {
throw new ISE("Cannot find running task");
}
}
workerCuratorCoordinator.stop();
TaskStatus status = future.get();
Assert.assertEquals(TaskStatus.Status.FAILED, status.getStatusCode());
}
private void doSetup() throws Exception
{
makeWorker();