Make RTR assign pending tasks by insertion order (#3405)

This commit is contained in:
Nishant 2016-08-31 00:52:44 +05:30 committed by Gian Merlino
parent b11e9544ea
commit 4c2b8d29d3
2 changed files with 36 additions and 0 deletions

View File

@ -88,6 +88,8 @@ import java.io.InputStream;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@ -604,6 +606,8 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
// make a copy of the pending tasks because tryAssignTask may delete tasks from pending and move them
// into running status
List<RemoteTaskRunnerWorkItem> copy = Lists.newArrayList(pendingTasks.values());
sortByInsertionTime(copy);
for (RemoteTaskRunnerWorkItem taskRunnerWorkItem : copy) {
String taskId = taskRunnerWorkItem.getTaskId();
if (tryAssignTasks.putIfAbsent(taskId, taskId) == null) {
@ -641,6 +645,20 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
);
}
@VisibleForTesting
static void sortByInsertionTime(List<RemoteTaskRunnerWorkItem> tasks)
{
Collections.sort(tasks, new Comparator<RemoteTaskRunnerWorkItem>()
{
@Override
public int compare(RemoteTaskRunnerWorkItem o1, RemoteTaskRunnerWorkItem o2)
{
return o1.getQueueInsertionTime().compareTo(o2.getQueueInsertionTime());
}
}
);
}
/**
* Removes a task from the complete queue and clears out the ZK status path of the task.
*

View File

@ -25,6 +25,7 @@ import com.google.common.base.Joiner;
import com.google.common.base.Predicate;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.ListenableFuture;
import com.metamx.emitter.EmittingLogger;
@ -41,12 +42,14 @@ import io.druid.indexing.worker.Worker;
import org.apache.curator.framework.CuratorFramework;
import org.apache.zookeeper.CreateMode;
import org.easymock.EasyMock;
import org.joda.time.DateTime;
import org.joda.time.Period;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Set;
import java.util.concurrent.Future;
@ -590,4 +593,19 @@ public class RemoteTaskRunnerTest
Assert.assertEquals(TaskStatus.Status.SUCCESS, status.getStatusCode());
}
@Test
public void testSortByInsertionTime() throws Exception
{
RemoteTaskRunnerWorkItem item1 = new RemoteTaskRunnerWorkItem("b", null, null)
.withQueueInsertionTime(new DateTime("2015-01-01T00:00:03Z"));
RemoteTaskRunnerWorkItem item2 = new RemoteTaskRunnerWorkItem("a", null, null)
.withQueueInsertionTime(new DateTime("2015-01-01T00:00:02Z"));
RemoteTaskRunnerWorkItem item3 = new RemoteTaskRunnerWorkItem("c", null, null)
.withQueueInsertionTime(new DateTime("2015-01-01T00:00:01Z"));
ArrayList<RemoteTaskRunnerWorkItem> workItems = Lists.newArrayList(item1, item2, item3);
RemoteTaskRunner.sortByInsertionTime(workItems);
Assert.assertEquals(item3,workItems.get(0));
Assert.assertEquals(item2, workItems.get(1));
Assert.assertEquals(item1, workItems.get(2));
}
}