diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java index d3a58240d0f..2f8f224289f 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java +++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java @@ -165,9 +165,9 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler } @Override - public int getDefaultPriority() + public int getPriority() { - return Tasks.DEFAULT_REALTIME_TASK_PRIORITY; + return getContextValue(Tasks.PRIORITY_KEY, Tasks.DEFAULT_REALTIME_TASK_PRIORITY); } @Override diff --git a/indexing-service/src/main/java/io/druid/indexing/common/TaskLock.java b/indexing-service/src/main/java/io/druid/indexing/common/TaskLock.java index e2d20281e04..2ad8fae335f 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/TaskLock.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/TaskLock.java @@ -37,9 +37,22 @@ public class TaskLock private final String dataSource; private final Interval interval; private final String version; - private final int priority; + private final Integer priority; private final boolean revoked; + public static TaskLock withPriority(TaskLock lock, int priority) + { + return new TaskLock( + lock.type, + lock.getGroupId(), + lock.getDataSource(), + lock.getInterval(), + lock.getVersion(), + priority, + lock.isRevoked() + ); + } + @JsonCreator public TaskLock( @JsonProperty("type") @Nullable TaskLockType type, // nullable for backward compatibility @@ -47,7 +60,7 @@ public class TaskLock @JsonProperty("dataSource") String dataSource, @JsonProperty("interval") Interval interval, @JsonProperty("version") String version, - @JsonProperty("priority") int priority, + @JsonProperty("priority") @Nullable Integer priority, @JsonProperty("revoked") boolean revoked ) { @@ -116,11 +129,17 @@ public class TaskLock } @JsonProperty - public int getPriority() + @Nullable + public Integer getPriority() { return priority; } + public int getNonNullPriority() + { + return Preconditions.checkNotNull(priority, "priority"); + } + @JsonProperty public boolean isRevoked() { @@ -139,7 +158,7 @@ public class TaskLock this.dataSource.equals(that.dataSource) && this.interval.equals(that.interval) && this.version.equals(that.version) && - this.priority == that.priority && + Objects.equal(this.priority, that.priority) && this.revoked == that.revoked; } } diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java index 5dcc35bde4a..fa930143f6a 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java @@ -202,9 +202,9 @@ public class AppenderatorDriverRealtimeIndexTask extends AbstractTask implements } @Override - public int getDefaultPriority() + public int getPriority() { - return Tasks.DEFAULT_REALTIME_TASK_PRIORITY; + return getContextValue(Tasks.PRIORITY_KEY, Tasks.DEFAULT_REALTIME_TASK_PRIORITY); } @Override diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/CompactionTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/CompactionTask.java index 05c9c578a0a..f81999f15f2 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/CompactionTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/CompactionTask.java @@ -183,9 +183,9 @@ public class CompactionTask extends AbstractTask } @Override - public int getDefaultPriority() + public int getPriority() { - return Tasks.DEFAULT_MERGE_TASK_PRIORITY; + return getContextValue(Tasks.PRIORITY_KEY, Tasks.DEFAULT_MERGE_TASK_PRIORITY); } @VisibleForTesting diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopIndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopIndexTask.java index a49239abcc7..4a72114be82 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopIndexTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopIndexTask.java @@ -171,9 +171,9 @@ public class HadoopIndexTask extends HadoopTask implements ChatHandler } @Override - public int getDefaultPriority() + public int getPriority() { - return Tasks.DEFAULT_BATCH_INDEX_TASK_PRIORITY; + return getContextValue(Tasks.PRIORITY_KEY, Tasks.DEFAULT_BATCH_INDEX_TASK_PRIORITY); } @Override diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java index ac1c94dae1c..f91eaf9a27b 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java @@ -241,9 +241,9 @@ public class IndexTask extends AbstractTask implements ChatHandler } @Override - public int getDefaultPriority() + public int getPriority() { - return Tasks.DEFAULT_BATCH_INDEX_TASK_PRIORITY; + return getContextValue(Tasks.PRIORITY_KEY, Tasks.DEFAULT_BATCH_INDEX_TASK_PRIORITY); } @Override diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/MergeTaskBase.java b/indexing-service/src/main/java/io/druid/indexing/common/task/MergeTaskBase.java index 675f27a64f6..b1f386c0bc9 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/MergeTaskBase.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/MergeTaskBase.java @@ -133,9 +133,9 @@ public abstract class MergeTaskBase extends AbstractFixedIntervalTask } @Override - public int getDefaultPriority() + public int getPriority() { - return Tasks.DEFAULT_MERGE_TASK_PRIORITY; + return getContextValue(Tasks.PRIORITY_KEY, Tasks.DEFAULT_MERGE_TASK_PRIORITY); } @Override diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/NoopTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/NoopTask.java index fab5a9d6b06..e704d6aa066 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/NoopTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/NoopTask.java @@ -152,9 +152,9 @@ public class NoopTask extends AbstractTask } @Override - public int getDefaultPriority() + public int getPriority() { - return Tasks.DEFAULT_BATCH_INDEX_TASK_PRIORITY; + return getContextValue(Tasks.PRIORITY_KEY, Tasks.DEFAULT_BATCH_INDEX_TASK_PRIORITY); } public static NoopTask create() diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java index 5ea6c081020..37a7ccf1139 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java @@ -161,9 +161,9 @@ public class RealtimeIndexTask extends AbstractTask } @Override - public int getDefaultPriority() + public int getPriority() { - return Tasks.DEFAULT_REALTIME_TASK_PRIORITY; + return getContextValue(Tasks.PRIORITY_KEY, Tasks.DEFAULT_REALTIME_TASK_PRIORITY); } @Override diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/Task.java b/indexing-service/src/main/java/io/druid/indexing/common/task/Task.java index 8848f7fa8a5..727c35e0854 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/Task.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/Task.java @@ -98,14 +98,6 @@ public interface Task return getContextValue(Tasks.PRIORITY_KEY, Tasks.DEFAULT_TASK_PRIORITY); } - /** - * Returns the default task priority. It can vary depending on the task type. - */ - default int getDefaultPriority() - { - return Tasks.DEFAULT_TASK_PRIORITY; - } - /** * Returns a {@link TaskResource} for this task. Task resources define specific worker requirements a task may * require. diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/TaskLockbox.java b/indexing-service/src/main/java/io/druid/indexing/overlord/TaskLockbox.java index 93f62b70eaf..c3cb42823c4 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/TaskLockbox.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/TaskLockbox.java @@ -130,17 +130,23 @@ public class TaskLockbox final TaskLock savedTaskLock = taskAndLock.rhs; if (savedTaskLock.getInterval().toDurationMillis() <= 0) { // "Impossible", but you never know what crazy stuff can be restored from storage. - log.warn("WTF?! Got lock with empty interval for task: %s", task.getId()); + log.warn("WTF?! Got lock[%s] with empty interval for task: %s", savedTaskLock, task.getId()); continue; } - final TaskLockPosse taskLockPosse = createOrFindLockPosse(task, savedTaskLock); + // Create a new taskLock if it doesn't have a proper priority, + // so that every taskLock in memory has the priority. + final TaskLock savedTaskLockWithPriority = savedTaskLock.getPriority() == null + ? TaskLock.withPriority(savedTaskLock, task.getPriority()) + : savedTaskLock; + + final TaskLockPosse taskLockPosse = createOrFindLockPosse(task, savedTaskLockWithPriority); if (taskLockPosse != null) { taskLockPosse.addTask(task); final TaskLock taskLock = taskLockPosse.getTaskLock(); - if (savedTaskLock.getVersion().equals(taskLock.getVersion())) { + if (savedTaskLockWithPriority.getVersion().equals(taskLock.getVersion())) { taskLockCount++; log.info( "Reacquired lock[%s] for task: %s", @@ -151,8 +157,8 @@ public class TaskLockbox taskLockCount++; log.info( "Could not reacquire lock on interval[%s] version[%s] (got version[%s] instead) for task: %s", - savedTaskLock.getInterval(), - savedTaskLock.getVersion(), + savedTaskLockWithPriority.getInterval(), + savedTaskLockWithPriority.getVersion(), taskLock.getVersion(), task.getId() ); @@ -160,8 +166,8 @@ public class TaskLockbox } else { throw new ISE( "Could not reacquire lock on interval[%s] version[%s] for task: %s", - savedTaskLock.getInterval(), - savedTaskLock.getVersion(), + savedTaskLockWithPriority.getInterval(), + savedTaskLockWithPriority.getVersion(), task.getId() ); } @@ -382,11 +388,14 @@ public class TaskLockbox taskLock.getDataSource(), task.getDataSource() ); + final int taskPriority = task.getPriority(); + final int lockPriority = taskLock.getNonNullPriority(); + Preconditions.checkArgument( - task.getPriority() == taskLock.getPriority(), + lockPriority == taskPriority, "lock priority[%s] is different from task priority[%s]", - taskLock.getPriority(), - task.getPriority() + lockPriority, + taskPriority ); return createOrFindLockPosse( @@ -396,7 +405,7 @@ public class TaskLockbox taskLock.getDataSource(), taskLock.getInterval(), taskLock.getVersion(), - taskLock.getPriority(), + taskPriority, taskLock.isRevoked() ); } @@ -925,7 +934,7 @@ public class TaskLockbox private static boolean isRevocable(TaskLockPosse lockPosse, int tryLockPriority) { final TaskLock existingLock = lockPosse.getTaskLock(); - return existingLock.isRevoked() || existingLock.getPriority() < tryLockPriority; + return existingLock.isRevoked() || existingLock.getNonNullPriority() < tryLockPriority; } private TaskLockPosse getOnlyTaskLockPosseContainingInterval(Task task, Interval interval) @@ -986,7 +995,7 @@ public class TaskLockbox boolean addTask(Task task) { Preconditions.checkArgument(taskLock.getGroupId().equals(task.getGroupId())); - Preconditions.checkArgument(taskLock.getPriority() == task.getPriority()); + Preconditions.checkArgument(taskLock.getNonNullPriority() == task.getPriority()); return taskIds.add(task.getId()); } diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/http/OverlordResource.java b/indexing-service/src/main/java/io/druid/indexing/overlord/http/OverlordResource.java index f3b7876523e..8b8d5e74db6 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/http/OverlordResource.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/http/OverlordResource.java @@ -44,7 +44,6 @@ import io.druid.indexer.TaskStatusPlus; import io.druid.indexing.common.actions.TaskActionClient; import io.druid.indexing.common.actions.TaskActionHolder; import io.druid.indexing.common.task.Task; -import io.druid.indexing.common.task.Tasks; import io.druid.indexing.overlord.IndexerMetadataStorageAdapter; import io.druid.indexing.overlord.TaskMaster; import io.druid.indexing.overlord.TaskQueue; @@ -176,11 +175,6 @@ public class OverlordResource public Response apply(TaskQueue taskQueue) { try { - // Set default priority if needed - final Integer priority = task.getContextValue(Tasks.PRIORITY_KEY); - if (priority == null) { - task.addToContext(Tasks.PRIORITY_KEY, task.getDefaultPriority()); - } taskQueue.add(task); return Response.ok(ImmutableMap.of("task", task.getId())).build(); } diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLockboxTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLockboxTest.java index f2dceb0ee30..442f304dc27 100644 --- a/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLockboxTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLockboxTest.java @@ -19,6 +19,9 @@ package io.druid.indexing.overlord; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.Iterables; import io.druid.indexer.TaskStatus; @@ -261,6 +264,84 @@ public class TaskLockboxTest Assert.assertEquals(beforeLocksInStorage, afterLocksInStorage); } + @Test + public void testSyncFromStorageWithMissingTaskLockPriority() throws EntryExistsException + { + final Task task = NoopTask.create(); + taskStorage.insert(task, TaskStatus.running(task.getId())); + taskStorage.addLock( + task.getId(), + new TaskLockWithoutPriority(task.getGroupId(), task.getDataSource(), Intervals.of("2017/2018"), "v1") + ); + + final List beforeLocksInStorage = taskStorage.getActiveTasks().stream() + .flatMap(t -> taskStorage.getLocks(t.getId()).stream()) + .collect(Collectors.toList()); + + final TaskLockbox lockbox = new TaskLockbox(taskStorage); + lockbox.syncFromStorage(); + + final List afterLocksInStorage = taskStorage.getActiveTasks().stream() + .flatMap(t -> taskStorage.getLocks(t.getId()).stream()) + .collect(Collectors.toList()); + + Assert.assertEquals(beforeLocksInStorage, afterLocksInStorage); + } + + @Test + public void testSyncFromStorageWithMissingTaskPriority() throws EntryExistsException + { + final Task task = NoopTask.create(); + taskStorage.insert(task, TaskStatus.running(task.getId())); + taskStorage.addLock( + task.getId(), + new TaskLock( + TaskLockType.EXCLUSIVE, + task.getGroupId(), + task.getDataSource(), + Intervals.of("2017/2018"), + "v1", + task.getPriority() + ) + ); + + final List beforeLocksInStorage = taskStorage.getActiveTasks().stream() + .flatMap(t -> taskStorage.getLocks(t.getId()).stream()) + .collect(Collectors.toList()); + + final TaskLockbox lockbox = new TaskLockbox(taskStorage); + lockbox.syncFromStorage(); + + final List afterLocksInStorage = taskStorage.getActiveTasks().stream() + .flatMap(t -> taskStorage.getLocks(t.getId()).stream()) + .collect(Collectors.toList()); + + Assert.assertEquals(beforeLocksInStorage, afterLocksInStorage); + } + + @Test + public void testSyncFromStorageWithInvalidPriority() throws EntryExistsException + { + final Task task = NoopTask.create(); + taskStorage.insert(task, TaskStatus.running(task.getId())); + taskStorage.addLock( + task.getId(), + new TaskLock( + TaskLockType.EXCLUSIVE, + task.getGroupId(), + task.getDataSource(), + Intervals.of("2017/2018"), + "v1", + 10 + ) + ); + + final TaskLockbox lockbox = new TaskLockbox(taskStorage); + expectedException.expect(IllegalArgumentException.class); + expectedException.expectMessage("lock priority[10] is different from task priority[50]"); + lockbox.syncFromStorage(); + } + @Test public void testRevokedLockSyncFromStorage() throws EntryExistsException { @@ -504,4 +585,67 @@ public class TaskLockboxTest .flatMap(task -> taskStorage.getLocks(task.getId()).stream()) .collect(Collectors.toSet()); } + + private static class TaskLockWithoutPriority extends TaskLock + { + @JsonCreator + TaskLockWithoutPriority( + String groupId, + String dataSource, + Interval interval, + String version + ) + { + super(null, groupId, dataSource, interval, version, 0, false); + } + + @Override + @JsonProperty + public TaskLockType getType() + { + return super.getType(); + } + + @Override + @JsonProperty + public String getGroupId() + { + return super.getGroupId(); + } + + @Override + @JsonProperty + public String getDataSource() + { + return super.getDataSource(); + } + + @Override + @JsonProperty + public Interval getInterval() + { + return super.getInterval(); + } + + @Override + @JsonProperty + public String getVersion() + { + return super.getVersion(); + } + + @JsonIgnore + @Override + public Integer getPriority() + { + return super.getPriority(); + } + + @JsonIgnore + @Override + public boolean isRevoked() + { + return super.isRevoked(); + } + } } diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/http/OverlordTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/http/OverlordTest.java index f947bb21dec..4c08cce4bf2 100644 --- a/indexing-service/src/test/java/io/druid/indexing/overlord/http/OverlordTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/http/OverlordTest.java @@ -31,13 +31,12 @@ import io.druid.curator.discovery.NoopServiceAnnouncer; import io.druid.discovery.DruidLeaderSelector; import io.druid.indexer.TaskLocation; import io.druid.indexer.TaskState; -import io.druid.indexer.TaskStatusPlus; import io.druid.indexer.TaskStatus; +import io.druid.indexer.TaskStatusPlus; import io.druid.indexing.common.actions.TaskActionClientFactory; import io.druid.indexing.common.config.TaskStorageConfig; import io.druid.indexing.common.task.NoopTask; import io.druid.indexing.common.task.Task; -import io.druid.indexing.common.task.Tasks; import io.druid.indexing.overlord.HeapMemoryTaskStorage; import io.druid.indexing.overlord.IndexerMetadataStorageAdapter; import io.druid.indexing.overlord.TaskLockbox; @@ -81,7 +80,6 @@ import javax.ws.rs.core.Response; import java.util.ArrayList; import java.util.Collection; import java.util.List; -import java.util.Map; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; @@ -233,12 +231,6 @@ public class OverlordTest Assert.assertEquals(200, response.getStatus()); Assert.assertEquals(ImmutableMap.of("task", taskId_0), response.getEntity()); - final Map context = task_0.getContext(); - Assert.assertEquals(1, context.size()); - final Integer priority = (Integer) context.get(Tasks.PRIORITY_KEY); - Assert.assertNotNull(priority); - Assert.assertEquals(Tasks.DEFAULT_BATCH_INDEX_TASK_PRIORITY, priority.intValue()); - // Duplicate task - should fail response = overlordResource.taskPost(task_0, req); Assert.assertEquals(400, response.getStatus());