Fix IllegalArgumentException in TaskLockBox.syncFromStorage() when updating from 0.12.x to 0.12.2 (#6086)

* Fix TaskLockBox.syncFromStorage() when updating from 0.12.x to 0.12.2

* Make the priority of taskLock nullable

* fix test

* fix build
This commit is contained in:
Jihoon Son 2018-08-03 17:13:44 -07:00 committed by GitHub
parent 914058f288
commit ef2d6e9118
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 206 additions and 56 deletions

View File

@ -165,9 +165,9 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler
}
@Override
public int getDefaultPriority()
public int getPriority()
{
return Tasks.DEFAULT_REALTIME_TASK_PRIORITY;
return getContextValue(Tasks.PRIORITY_KEY, Tasks.DEFAULT_REALTIME_TASK_PRIORITY);
}
@Override

View File

@ -37,9 +37,22 @@ public class TaskLock
private final String dataSource;
private final Interval interval;
private final String version;
private final int priority;
private final Integer priority;
private final boolean revoked;
public static TaskLock withPriority(TaskLock lock, int priority)
{
return new TaskLock(
lock.type,
lock.getGroupId(),
lock.getDataSource(),
lock.getInterval(),
lock.getVersion(),
priority,
lock.isRevoked()
);
}
@JsonCreator
public TaskLock(
@JsonProperty("type") @Nullable TaskLockType type, // nullable for backward compatibility
@ -47,7 +60,7 @@ public class TaskLock
@JsonProperty("dataSource") String dataSource,
@JsonProperty("interval") Interval interval,
@JsonProperty("version") String version,
@JsonProperty("priority") int priority,
@JsonProperty("priority") @Nullable Integer priority,
@JsonProperty("revoked") boolean revoked
)
{
@ -116,11 +129,17 @@ public class TaskLock
}
@JsonProperty
public int getPriority()
@Nullable
public Integer getPriority()
{
return priority;
}
public int getNonNullPriority()
{
return Preconditions.checkNotNull(priority, "priority");
}
@JsonProperty
public boolean isRevoked()
{
@ -139,7 +158,7 @@ public class TaskLock
this.dataSource.equals(that.dataSource) &&
this.interval.equals(that.interval) &&
this.version.equals(that.version) &&
this.priority == that.priority &&
Objects.equal(this.priority, that.priority) &&
this.revoked == that.revoked;
}
}

View File

@ -202,9 +202,9 @@ public class AppenderatorDriverRealtimeIndexTask extends AbstractTask implements
}
@Override
public int getDefaultPriority()
public int getPriority()
{
return Tasks.DEFAULT_REALTIME_TASK_PRIORITY;
return getContextValue(Tasks.PRIORITY_KEY, Tasks.DEFAULT_REALTIME_TASK_PRIORITY);
}
@Override

View File

@ -183,9 +183,9 @@ public class CompactionTask extends AbstractTask
}
@Override
public int getDefaultPriority()
public int getPriority()
{
return Tasks.DEFAULT_MERGE_TASK_PRIORITY;
return getContextValue(Tasks.PRIORITY_KEY, Tasks.DEFAULT_MERGE_TASK_PRIORITY);
}
@VisibleForTesting

View File

@ -171,9 +171,9 @@ public class HadoopIndexTask extends HadoopTask implements ChatHandler
}
@Override
public int getDefaultPriority()
public int getPriority()
{
return Tasks.DEFAULT_BATCH_INDEX_TASK_PRIORITY;
return getContextValue(Tasks.PRIORITY_KEY, Tasks.DEFAULT_BATCH_INDEX_TASK_PRIORITY);
}
@Override

View File

@ -241,9 +241,9 @@ public class IndexTask extends AbstractTask implements ChatHandler
}
@Override
public int getDefaultPriority()
public int getPriority()
{
return Tasks.DEFAULT_BATCH_INDEX_TASK_PRIORITY;
return getContextValue(Tasks.PRIORITY_KEY, Tasks.DEFAULT_BATCH_INDEX_TASK_PRIORITY);
}
@Override

View File

@ -133,9 +133,9 @@ public abstract class MergeTaskBase extends AbstractFixedIntervalTask
}
@Override
public int getDefaultPriority()
public int getPriority()
{
return Tasks.DEFAULT_MERGE_TASK_PRIORITY;
return getContextValue(Tasks.PRIORITY_KEY, Tasks.DEFAULT_MERGE_TASK_PRIORITY);
}
@Override

View File

@ -152,9 +152,9 @@ public class NoopTask extends AbstractTask
}
@Override
public int getDefaultPriority()
public int getPriority()
{
return Tasks.DEFAULT_BATCH_INDEX_TASK_PRIORITY;
return getContextValue(Tasks.PRIORITY_KEY, Tasks.DEFAULT_BATCH_INDEX_TASK_PRIORITY);
}
public static NoopTask create()

View File

@ -161,9 +161,9 @@ public class RealtimeIndexTask extends AbstractTask
}
@Override
public int getDefaultPriority()
public int getPriority()
{
return Tasks.DEFAULT_REALTIME_TASK_PRIORITY;
return getContextValue(Tasks.PRIORITY_KEY, Tasks.DEFAULT_REALTIME_TASK_PRIORITY);
}
@Override

View File

@ -98,14 +98,6 @@ public interface Task
return getContextValue(Tasks.PRIORITY_KEY, Tasks.DEFAULT_TASK_PRIORITY);
}
/**
* Returns the default task priority. It can vary depending on the task type.
*/
default int getDefaultPriority()
{
return Tasks.DEFAULT_TASK_PRIORITY;
}
/**
* Returns a {@link TaskResource} for this task. Task resources define specific worker requirements a task may
* require.

View File

@ -130,17 +130,23 @@ public class TaskLockbox
final TaskLock savedTaskLock = taskAndLock.rhs;
if (savedTaskLock.getInterval().toDurationMillis() <= 0) {
// "Impossible", but you never know what crazy stuff can be restored from storage.
log.warn("WTF?! Got lock with empty interval for task: %s", task.getId());
log.warn("WTF?! Got lock[%s] with empty interval for task: %s", savedTaskLock, task.getId());
continue;
}
final TaskLockPosse taskLockPosse = createOrFindLockPosse(task, savedTaskLock);
// Create a new taskLock if it doesn't have a proper priority,
// so that every taskLock in memory has the priority.
final TaskLock savedTaskLockWithPriority = savedTaskLock.getPriority() == null
? TaskLock.withPriority(savedTaskLock, task.getPriority())
: savedTaskLock;
final TaskLockPosse taskLockPosse = createOrFindLockPosse(task, savedTaskLockWithPriority);
if (taskLockPosse != null) {
taskLockPosse.addTask(task);
final TaskLock taskLock = taskLockPosse.getTaskLock();
if (savedTaskLock.getVersion().equals(taskLock.getVersion())) {
if (savedTaskLockWithPriority.getVersion().equals(taskLock.getVersion())) {
taskLockCount++;
log.info(
"Reacquired lock[%s] for task: %s",
@ -151,8 +157,8 @@ public class TaskLockbox
taskLockCount++;
log.info(
"Could not reacquire lock on interval[%s] version[%s] (got version[%s] instead) for task: %s",
savedTaskLock.getInterval(),
savedTaskLock.getVersion(),
savedTaskLockWithPriority.getInterval(),
savedTaskLockWithPriority.getVersion(),
taskLock.getVersion(),
task.getId()
);
@ -160,8 +166,8 @@ public class TaskLockbox
} else {
throw new ISE(
"Could not reacquire lock on interval[%s] version[%s] for task: %s",
savedTaskLock.getInterval(),
savedTaskLock.getVersion(),
savedTaskLockWithPriority.getInterval(),
savedTaskLockWithPriority.getVersion(),
task.getId()
);
}
@ -382,11 +388,14 @@ public class TaskLockbox
taskLock.getDataSource(),
task.getDataSource()
);
final int taskPriority = task.getPriority();
final int lockPriority = taskLock.getNonNullPriority();
Preconditions.checkArgument(
task.getPriority() == taskLock.getPriority(),
lockPriority == taskPriority,
"lock priority[%s] is different from task priority[%s]",
taskLock.getPriority(),
task.getPriority()
lockPriority,
taskPriority
);
return createOrFindLockPosse(
@ -396,7 +405,7 @@ public class TaskLockbox
taskLock.getDataSource(),
taskLock.getInterval(),
taskLock.getVersion(),
taskLock.getPriority(),
taskPriority,
taskLock.isRevoked()
);
}
@ -925,7 +934,7 @@ public class TaskLockbox
private static boolean isRevocable(TaskLockPosse lockPosse, int tryLockPriority)
{
final TaskLock existingLock = lockPosse.getTaskLock();
return existingLock.isRevoked() || existingLock.getPriority() < tryLockPriority;
return existingLock.isRevoked() || existingLock.getNonNullPriority() < tryLockPriority;
}
private TaskLockPosse getOnlyTaskLockPosseContainingInterval(Task task, Interval interval)
@ -986,7 +995,7 @@ public class TaskLockbox
boolean addTask(Task task)
{
Preconditions.checkArgument(taskLock.getGroupId().equals(task.getGroupId()));
Preconditions.checkArgument(taskLock.getPriority() == task.getPriority());
Preconditions.checkArgument(taskLock.getNonNullPriority() == task.getPriority());
return taskIds.add(task.getId());
}

View File

@ -44,7 +44,6 @@ import io.druid.indexer.TaskStatusPlus;
import io.druid.indexing.common.actions.TaskActionClient;
import io.druid.indexing.common.actions.TaskActionHolder;
import io.druid.indexing.common.task.Task;
import io.druid.indexing.common.task.Tasks;
import io.druid.indexing.overlord.IndexerMetadataStorageAdapter;
import io.druid.indexing.overlord.TaskMaster;
import io.druid.indexing.overlord.TaskQueue;
@ -176,11 +175,6 @@ public class OverlordResource
public Response apply(TaskQueue taskQueue)
{
try {
// Set default priority if needed
final Integer priority = task.getContextValue(Tasks.PRIORITY_KEY);
if (priority == null) {
task.addToContext(Tasks.PRIORITY_KEY, task.getDefaultPriority());
}
taskQueue.add(task);
return Response.ok(ImmutableMap.of("task", task.getId())).build();
}

View File

@ -19,6 +19,9 @@
package io.druid.indexing.overlord;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Iterables;
import io.druid.indexer.TaskStatus;
@ -261,6 +264,84 @@ public class TaskLockboxTest
Assert.assertEquals(beforeLocksInStorage, afterLocksInStorage);
}
@Test
public void testSyncFromStorageWithMissingTaskLockPriority() throws EntryExistsException
{
final Task task = NoopTask.create();
taskStorage.insert(task, TaskStatus.running(task.getId()));
taskStorage.addLock(
task.getId(),
new TaskLockWithoutPriority(task.getGroupId(), task.getDataSource(), Intervals.of("2017/2018"), "v1")
);
final List<TaskLock> beforeLocksInStorage = taskStorage.getActiveTasks().stream()
.flatMap(t -> taskStorage.getLocks(t.getId()).stream())
.collect(Collectors.toList());
final TaskLockbox lockbox = new TaskLockbox(taskStorage);
lockbox.syncFromStorage();
final List<TaskLock> afterLocksInStorage = taskStorage.getActiveTasks().stream()
.flatMap(t -> taskStorage.getLocks(t.getId()).stream())
.collect(Collectors.toList());
Assert.assertEquals(beforeLocksInStorage, afterLocksInStorage);
}
@Test
public void testSyncFromStorageWithMissingTaskPriority() throws EntryExistsException
{
final Task task = NoopTask.create();
taskStorage.insert(task, TaskStatus.running(task.getId()));
taskStorage.addLock(
task.getId(),
new TaskLock(
TaskLockType.EXCLUSIVE,
task.getGroupId(),
task.getDataSource(),
Intervals.of("2017/2018"),
"v1",
task.getPriority()
)
);
final List<TaskLock> beforeLocksInStorage = taskStorage.getActiveTasks().stream()
.flatMap(t -> taskStorage.getLocks(t.getId()).stream())
.collect(Collectors.toList());
final TaskLockbox lockbox = new TaskLockbox(taskStorage);
lockbox.syncFromStorage();
final List<TaskLock> afterLocksInStorage = taskStorage.getActiveTasks().stream()
.flatMap(t -> taskStorage.getLocks(t.getId()).stream())
.collect(Collectors.toList());
Assert.assertEquals(beforeLocksInStorage, afterLocksInStorage);
}
@Test
public void testSyncFromStorageWithInvalidPriority() throws EntryExistsException
{
final Task task = NoopTask.create();
taskStorage.insert(task, TaskStatus.running(task.getId()));
taskStorage.addLock(
task.getId(),
new TaskLock(
TaskLockType.EXCLUSIVE,
task.getGroupId(),
task.getDataSource(),
Intervals.of("2017/2018"),
"v1",
10
)
);
final TaskLockbox lockbox = new TaskLockbox(taskStorage);
expectedException.expect(IllegalArgumentException.class);
expectedException.expectMessage("lock priority[10] is different from task priority[50]");
lockbox.syncFromStorage();
}
@Test
public void testRevokedLockSyncFromStorage() throws EntryExistsException
{
@ -504,4 +585,67 @@ public class TaskLockboxTest
.flatMap(task -> taskStorage.getLocks(task.getId()).stream())
.collect(Collectors.toSet());
}
private static class TaskLockWithoutPriority extends TaskLock
{
@JsonCreator
TaskLockWithoutPriority(
String groupId,
String dataSource,
Interval interval,
String version
)
{
super(null, groupId, dataSource, interval, version, 0, false);
}
@Override
@JsonProperty
public TaskLockType getType()
{
return super.getType();
}
@Override
@JsonProperty
public String getGroupId()
{
return super.getGroupId();
}
@Override
@JsonProperty
public String getDataSource()
{
return super.getDataSource();
}
@Override
@JsonProperty
public Interval getInterval()
{
return super.getInterval();
}
@Override
@JsonProperty
public String getVersion()
{
return super.getVersion();
}
@JsonIgnore
@Override
public Integer getPriority()
{
return super.getPriority();
}
@JsonIgnore
@Override
public boolean isRevoked()
{
return super.isRevoked();
}
}
}

View File

@ -31,13 +31,12 @@ import io.druid.curator.discovery.NoopServiceAnnouncer;
import io.druid.discovery.DruidLeaderSelector;
import io.druid.indexer.TaskLocation;
import io.druid.indexer.TaskState;
import io.druid.indexer.TaskStatusPlus;
import io.druid.indexer.TaskStatus;
import io.druid.indexer.TaskStatusPlus;
import io.druid.indexing.common.actions.TaskActionClientFactory;
import io.druid.indexing.common.config.TaskStorageConfig;
import io.druid.indexing.common.task.NoopTask;
import io.druid.indexing.common.task.Task;
import io.druid.indexing.common.task.Tasks;
import io.druid.indexing.overlord.HeapMemoryTaskStorage;
import io.druid.indexing.overlord.IndexerMetadataStorageAdapter;
import io.druid.indexing.overlord.TaskLockbox;
@ -81,7 +80,6 @@ import javax.ws.rs.core.Response;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
@ -233,12 +231,6 @@ public class OverlordTest
Assert.assertEquals(200, response.getStatus());
Assert.assertEquals(ImmutableMap.of("task", taskId_0), response.getEntity());
final Map<String, Object> context = task_0.getContext();
Assert.assertEquals(1, context.size());
final Integer priority = (Integer) context.get(Tasks.PRIORITY_KEY);
Assert.assertNotNull(priority);
Assert.assertEquals(Tasks.DEFAULT_BATCH_INDEX_TASK_PRIORITY, priority.intValue());
// Duplicate task - should fail
response = overlordResource.taskPost(task_0, req);
Assert.assertEquals(400, response.getStatus());