diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunner.java b/indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunner.java index 8b8f6cceded..8643b0f6c60 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunner.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunner.java @@ -88,6 +88,8 @@ import java.io.InputStream; import java.net.MalformedURLException; import java.net.URL; import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -604,6 +606,8 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer // make a copy of the pending tasks because tryAssignTask may delete tasks from pending and move them // into running status List copy = Lists.newArrayList(pendingTasks.values()); + sortByInsertionTime(copy); + for (RemoteTaskRunnerWorkItem taskRunnerWorkItem : copy) { String taskId = taskRunnerWorkItem.getTaskId(); if (tryAssignTasks.putIfAbsent(taskId, taskId) == null) { @@ -641,6 +645,20 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer ); } + @VisibleForTesting + static void sortByInsertionTime(List tasks) + { + Collections.sort(tasks, new Comparator() + { + @Override + public int compare(RemoteTaskRunnerWorkItem o1, RemoteTaskRunnerWorkItem o2) + { + return o1.getQueueInsertionTime().compareTo(o2.getQueueInsertionTime()); + } + } + ); + } + /** * Removes a task from the complete queue and clears out the ZK status path of the task. * diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/RemoteTaskRunnerTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/RemoteTaskRunnerTest.java index 72381a06559..02f77b40fd6 100644 --- a/indexing-service/src/test/java/io/druid/indexing/overlord/RemoteTaskRunnerTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/RemoteTaskRunnerTest.java @@ -25,6 +25,7 @@ import com.google.common.base.Joiner; import com.google.common.base.Predicate; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; import com.google.common.collect.Sets; import com.google.common.util.concurrent.ListenableFuture; import com.metamx.emitter.EmittingLogger; @@ -41,12 +42,14 @@ import io.druid.indexing.worker.Worker; import org.apache.curator.framework.CuratorFramework; import org.apache.zookeeper.CreateMode; import org.easymock.EasyMock; +import org.joda.time.DateTime; import org.joda.time.Period; import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import java.util.ArrayList; import java.util.Collection; import java.util.Set; import java.util.concurrent.Future; @@ -590,4 +593,19 @@ public class RemoteTaskRunnerTest Assert.assertEquals(TaskStatus.Status.SUCCESS, status.getStatusCode()); } + @Test + public void testSortByInsertionTime() throws Exception + { + RemoteTaskRunnerWorkItem item1 = new RemoteTaskRunnerWorkItem("b", null, null) + .withQueueInsertionTime(new DateTime("2015-01-01T00:00:03Z")); + RemoteTaskRunnerWorkItem item2 = new RemoteTaskRunnerWorkItem("a", null, null) + .withQueueInsertionTime(new DateTime("2015-01-01T00:00:02Z")); + RemoteTaskRunnerWorkItem item3 = new RemoteTaskRunnerWorkItem("c", null, null) + .withQueueInsertionTime(new DateTime("2015-01-01T00:00:01Z")); + ArrayList workItems = Lists.newArrayList(item1, item2, item3); + RemoteTaskRunner.sortByInsertionTime(workItems); + Assert.assertEquals(item3,workItems.get(0)); + Assert.assertEquals(item2, workItems.get(1)); + Assert.assertEquals(item1, workItems.get(2)); + } }