Fix mismatch in revoked task locks between memory and metastore after sync from storage (#5858)

* Fix mismatched revoked task locks after sync from storage

* fix build

* fix log

* fix lock release
This commit is contained in:
Jihoon Son 2018-06-12 07:25:34 -07:00 committed by Gian Merlino
parent 0ae4aba4e2
commit 2feec44a55
3 changed files with 171 additions and 39 deletions

View File

@ -258,15 +258,15 @@ public class MetadataTaskStorage implements TaskStorage
Preconditions.checkNotNull(newLock, "newLock");
log.info(
"Replacing lock on interval[%s] version[%s] for task: %s",
oldLock.getInterval(),
oldLock.getVersion(),
"Replacing an existing lock[%s] with a new lock[%s] for task: %s",
oldLock,
newLock,
taskid
);
final Long oldLockId = handler.getLockId(taskid, oldLock);
if (oldLockId == null) {
throw new ISE("Cannot find lock[%s]", oldLock);
throw new ISE("Cannot find an existing lock[%s]", oldLock);
}
handler.replaceLock(taskid, oldLockId, newLock);

View File

@ -31,14 +31,15 @@ import com.google.common.collect.Maps;
import com.google.common.collect.Ordering;
import com.google.common.collect.Sets;
import com.google.inject.Inject;
import io.druid.java.util.emitter.EmittingLogger;
import io.druid.indexing.common.TaskLock;
import io.druid.indexing.common.TaskLockType;
import io.druid.indexing.common.task.Task;
import io.druid.java.util.common.DateTimes;
import io.druid.java.util.common.ISE;
import io.druid.java.util.common.Pair;
import io.druid.java.util.common.StringUtils;
import io.druid.java.util.common.guava.Comparators;
import io.druid.java.util.emitter.EmittingLogger;
import org.joda.time.Interval;
import javax.annotation.Nullable;
@ -133,12 +134,7 @@ public class TaskLockbox
continue;
}
final TaskLockPosse taskLockPosse = createOrFindLockPosse(
task,
savedTaskLock.getInterval(),
savedTaskLock.getVersion(),
savedTaskLock.getType()
);
final TaskLockPosse taskLockPosse = createOrFindLockPosse(task, savedTaskLock);
if (taskLockPosse != null) {
taskLockPosse.addTask(task);
@ -147,9 +143,8 @@ public class TaskLockbox
if (savedTaskLock.getVersion().equals(taskLock.getVersion())) {
taskLockCount++;
log.info(
"Reacquired lock on interval[%s] version[%s] for task: %s",
savedTaskLock.getInterval(),
savedTaskLock.getVersion(),
"Reacquired lock[%s] for task: %s",
taskLock,
task.getId()
);
} else {
@ -340,7 +335,7 @@ public class TaskLockbox
*
* @return a lock posse or null if any posse is found and a new poss cannot be created
*
* @see #createNewTaskLockPosse(TaskLockType, String, String, Interval, String, int)
* @see #createNewTaskLockPosse
*/
@Nullable
private TaskLockPosse createOrFindLockPosse(
@ -353,8 +348,78 @@ public class TaskLockbox
giant.lock();
try {
final String dataSource = task.getDataSource();
final int priority = task.getPriority();
return createOrFindLockPosse(
lockType,
task.getId(),
task.getGroupId(),
task.getDataSource(),
interval,
preferredVersion,
task.getPriority(),
false
);
}
finally {
giant.unlock();
}
}
@Nullable
private TaskLockPosse createOrFindLockPosse(Task task, TaskLock taskLock)
{
giant.lock();
try {
Preconditions.checkArgument(
task.getGroupId().equals(taskLock.getGroupId()),
"lock groupId[%s] is different from task groupId[%s]",
taskLock.getGroupId(),
task.getGroupId()
);
Preconditions.checkArgument(
task.getDataSource().equals(taskLock.getDataSource()),
"lock dataSource[%s] is different from task dataSource[%s]",
taskLock.getDataSource(),
task.getDataSource()
);
Preconditions.checkArgument(
task.getPriority() == taskLock.getPriority(),
"lock priority[%s] is different from task priority[%s]",
taskLock.getPriority(),
task.getPriority()
);
return createOrFindLockPosse(
taskLock.getType(),
task.getId(),
taskLock.getGroupId(),
taskLock.getDataSource(),
taskLock.getInterval(),
taskLock.getVersion(),
taskLock.getPriority(),
taskLock.isRevoked()
);
}
finally {
giant.unlock();
}
}
@Nullable
private TaskLockPosse createOrFindLockPosse(
TaskLockType lockType,
String taskId,
String groupId,
String dataSource,
Interval interval,
@Nullable String preferredVersion,
int priority,
boolean revoked
)
{
giant.lock();
try {
final List<TaskLockPosse> foundPosses = findLockPossesOverlapsInterval(dataSource, interval);
if (foundPosses.size() > 0) {
@ -362,7 +427,7 @@ public class TaskLockbox
// If they can't be reused, check lock priority and revoke existing locks if possible.
final List<TaskLockPosse> filteredPosses = foundPosses
.stream()
.filter(posse -> matchGroupIdAndContainInterval(posse.taskLock, task, interval))
.filter(posse -> matchGroupIdAndContainInterval(posse.taskLock, groupId, interval))
.collect(Collectors.toList());
if (filteredPosses.size() == 0) {
@ -372,11 +437,12 @@ public class TaskLockbox
// Any number of shared locks can be acquired for the same dataSource and interval.
return createNewTaskLockPosse(
lockType,
task.getGroupId(),
groupId,
dataSource,
interval,
preferredVersion,
priority
priority,
revoked
);
} else {
if (isAllRevocable(foundPosses, priority)) {
@ -385,19 +451,38 @@ public class TaskLockbox
return createNewTaskLockPosse(
lockType,
task.getGroupId(),
groupId,
dataSource,
interval,
preferredVersion,
priority
priority,
revoked
);
} else {
log.info(
"Cannot create a new taskLockPosse for task[%s] and interval[%s] with priority[%d]"
+ " because existing locks[%s] have same or higher priorities",
task.getId(),
final String messagePrefix;
if (preferredVersion == null) {
messagePrefix = StringUtils.format(
"Cannot create a new taskLockPosse for task[%s], interval[%s], priority[%d], revoked[%s]",
taskId,
interval,
priority,
revoked
);
} else {
messagePrefix = StringUtils.format(
"Cannot create a new taskLockPosse for task[%s], interval[%s],"
+ " preferredVersion[%s], priority[%d], revoked[%s]",
taskId,
interval,
preferredVersion,
priority,
revoked
);
}
log.info(
"%s because existing locks[%s] have same or higher priorities",
messagePrefix,
foundPosses
);
return null;
@ -411,7 +496,7 @@ public class TaskLockbox
} else {
throw new ISE(
"Task[%s] already acquired a lock for interval[%s] but different type[%s]",
task.getId(),
taskId,
interval,
foundPosse.getTaskLock().getType()
);
@ -420,7 +505,7 @@ public class TaskLockbox
// case 3) we found multiple lock posses for the given task
throw new ISE(
"Task group[%s] has multiple locks for the same interval[%s]?",
task.getGroupId(),
groupId,
interval
);
}
@ -429,11 +514,12 @@ public class TaskLockbox
// Let's make a new one.
return createNewTaskLockPosse(
lockType,
task.getGroupId(),
groupId,
dataSource,
interval,
preferredVersion,
priority
priority,
revoked
);
}
}
@ -454,6 +540,7 @@ public class TaskLockbox
* @param interval interval to be locked
* @param preferredVersion preferred version string
* @param priority lock priority
* @param revoked indicate the lock is revoked
*
* @return a new {@link TaskLockPosse}
*/
@ -463,7 +550,8 @@ public class TaskLockbox
String dataSource,
Interval interval,
@Nullable String preferredVersion,
int priority
int priority,
boolean revoked
)
{
giant.lock();
@ -486,7 +574,7 @@ public class TaskLockbox
}
final TaskLockPosse posseToUse = new TaskLockPosse(
new TaskLock(lockType, groupId, dataSource, interval, version, priority)
new TaskLock(lockType, groupId, dataSource, interval, version, priority, revoked)
);
running.computeIfAbsent(dataSource, k -> new TreeMap<>(Comparators.intervalsByStartThenEnd()))
.computeIfAbsent(interval, k -> new ArrayList<>())
@ -817,10 +905,10 @@ public class TaskLockbox
}
}
private static boolean matchGroupIdAndContainInterval(TaskLock existingLock, Task task, Interval interval)
private static boolean matchGroupIdAndContainInterval(TaskLock existingLock, String taskGroupId, Interval interval)
{
return existingLock.getInterval().contains(interval) &&
existingLock.getGroupId().equals(task.getGroupId());
existingLock.getGroupId().equals(taskGroupId);
}
private static boolean isAllSharedLocks(List<TaskLockPosse> lockPosses)
@ -863,7 +951,7 @@ public class TaskLockbox
}
@VisibleForTesting
public Map<String, NavigableMap<Interval, List<TaskLockPosse>>> getAllLocks()
Map<String, NavigableMap<Interval, List<TaskLockPosse>>> getAllLocks()
{
return running;
}
@ -921,7 +1009,7 @@ public class TaskLockbox
void forEachTask(Consumer<String> action)
{
Preconditions.checkNotNull(action);
Preconditions.checkNotNull(action, "action");
taskIds.forEach(action);
}

View File

@ -21,8 +21,6 @@ package io.druid.indexing.overlord;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Iterables;
import io.druid.java.util.emitter.EmittingLogger;
import io.druid.java.util.emitter.service.ServiceEmitter;
import io.druid.indexing.common.TaskLock;
import io.druid.indexing.common.TaskLockType;
import io.druid.indexing.common.TaskStatus;
@ -33,6 +31,8 @@ import io.druid.jackson.DefaultObjectMapper;
import io.druid.java.util.common.ISE;
import io.druid.java.util.common.Intervals;
import io.druid.java.util.common.StringUtils;
import io.druid.java.util.emitter.EmittingLogger;
import io.druid.java.util.emitter.service.ServiceEmitter;
import io.druid.metadata.DerbyMetadataStorageActionHandlerFactory;
import io.druid.metadata.EntryExistsException;
import io.druid.metadata.TestDerbyConnector;
@ -45,9 +45,11 @@ import org.junit.Test;
import org.junit.rules.ExpectedException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
@ -259,6 +261,48 @@ public class TaskLockboxTest
Assert.assertEquals(beforeLocksInStorage, afterLocksInStorage);
}
@Test
public void testRevokedLockSyncFromStorage() throws EntryExistsException
{
final TaskLockbox originalBox = new TaskLockbox(taskStorage);
final Task task1 = NoopTask.create("task1", 10);
taskStorage.insert(task1, TaskStatus.running(task1.getId()));
originalBox.add(task1);
Assert.assertTrue(originalBox.tryLock(TaskLockType.EXCLUSIVE, task1, Intervals.of("2017/2018")).isOk());
// task2 revokes task1
final Task task2 = NoopTask.create("task2", 100);
taskStorage.insert(task2, TaskStatus.running(task2.getId()));
originalBox.add(task2);
Assert.assertTrue(originalBox.tryLock(TaskLockType.EXCLUSIVE, task2, Intervals.of("2017/2018")).isOk());
final Map<String, List<TaskLock>> beforeLocksInStorage = taskStorage
.getActiveTasks()
.stream()
.collect(Collectors.toMap(Task::getId, task -> taskStorage.getLocks(task.getId())));
final List<TaskLock> task1Locks = beforeLocksInStorage.get("task1");
Assert.assertEquals(1, task1Locks.size());
Assert.assertTrue(task1Locks.get(0).isRevoked());
final List<TaskLock> task2Locks = beforeLocksInStorage.get("task1");
Assert.assertEquals(1, task2Locks.size());
Assert.assertTrue(task2Locks.get(0).isRevoked());
final TaskLockbox newBox = new TaskLockbox(taskStorage);
newBox.syncFromStorage();
final Set<TaskLock> afterLocksInStorage = taskStorage.getActiveTasks().stream()
.flatMap(task -> taskStorage.getLocks(task.getId()).stream())
.collect(Collectors.toSet());
Assert.assertEquals(
beforeLocksInStorage.values().stream().flatMap(Collection::stream).collect(Collectors.toSet()),
afterLocksInStorage
);
}
@Test
public void testDoInCriticalSectionWithSharedLock() throws Exception
{