From 2feec44a55be1248711f5aefffe4cad9fad11b47 Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Tue, 12 Jun 2018 07:25:34 -0700 Subject: [PATCH] Fix mismatch in revoked task locks between memory and metastore after sync from storage (#5858) * Fix mismatched revoked task locks after sync from storage * fix build * fix log * fix lock release --- .../overlord/MetadataTaskStorage.java | 8 +- .../druid/indexing/overlord/TaskLockbox.java | 154 ++++++++++++++---- .../indexing/overlord/TaskLockboxTest.java | 48 +++++- 3 files changed, 171 insertions(+), 39 deletions(-) diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/MetadataTaskStorage.java b/indexing-service/src/main/java/io/druid/indexing/overlord/MetadataTaskStorage.java index 011aaa5d167..4025b1c5b32 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/MetadataTaskStorage.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/MetadataTaskStorage.java @@ -258,15 +258,15 @@ public class MetadataTaskStorage implements TaskStorage Preconditions.checkNotNull(newLock, "newLock"); log.info( - "Replacing lock on interval[%s] version[%s] for task: %s", - oldLock.getInterval(), - oldLock.getVersion(), + "Replacing an existing lock[%s] with a new lock[%s] for task: %s", + oldLock, + newLock, taskid ); final Long oldLockId = handler.getLockId(taskid, oldLock); if (oldLockId == null) { - throw new ISE("Cannot find lock[%s]", oldLock); + throw new ISE("Cannot find an existing lock[%s]", oldLock); } handler.replaceLock(taskid, oldLockId, newLock); diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/TaskLockbox.java b/indexing-service/src/main/java/io/druid/indexing/overlord/TaskLockbox.java index 189564a01e2..e436dae74e7 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/TaskLockbox.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/TaskLockbox.java @@ -31,14 +31,15 @@ import com.google.common.collect.Maps; import com.google.common.collect.Ordering; import com.google.common.collect.Sets; import com.google.inject.Inject; -import io.druid.java.util.emitter.EmittingLogger; import io.druid.indexing.common.TaskLock; import io.druid.indexing.common.TaskLockType; import io.druid.indexing.common.task.Task; import io.druid.java.util.common.DateTimes; import io.druid.java.util.common.ISE; import io.druid.java.util.common.Pair; +import io.druid.java.util.common.StringUtils; import io.druid.java.util.common.guava.Comparators; +import io.druid.java.util.emitter.EmittingLogger; import org.joda.time.Interval; import javax.annotation.Nullable; @@ -133,12 +134,7 @@ public class TaskLockbox continue; } - final TaskLockPosse taskLockPosse = createOrFindLockPosse( - task, - savedTaskLock.getInterval(), - savedTaskLock.getVersion(), - savedTaskLock.getType() - ); + final TaskLockPosse taskLockPosse = createOrFindLockPosse(task, savedTaskLock); if (taskLockPosse != null) { taskLockPosse.addTask(task); @@ -147,9 +143,8 @@ public class TaskLockbox if (savedTaskLock.getVersion().equals(taskLock.getVersion())) { taskLockCount++; log.info( - "Reacquired lock on interval[%s] version[%s] for task: %s", - savedTaskLock.getInterval(), - savedTaskLock.getVersion(), + "Reacquired lock[%s] for task: %s", + taskLock, task.getId() ); } else { @@ -340,7 +335,7 @@ public class TaskLockbox * * @return a lock posse or null if any posse is found and a new poss cannot be created * - * @see #createNewTaskLockPosse(TaskLockType, String, String, Interval, String, int) + * @see #createNewTaskLockPosse */ @Nullable private TaskLockPosse createOrFindLockPosse( @@ -353,8 +348,78 @@ public class TaskLockbox giant.lock(); try { - final String dataSource = task.getDataSource(); - final int priority = task.getPriority(); + return createOrFindLockPosse( + lockType, + task.getId(), + task.getGroupId(), + task.getDataSource(), + interval, + preferredVersion, + task.getPriority(), + false + ); + } + finally { + giant.unlock(); + } + } + + @Nullable + private TaskLockPosse createOrFindLockPosse(Task task, TaskLock taskLock) + { + giant.lock(); + + try { + Preconditions.checkArgument( + task.getGroupId().equals(taskLock.getGroupId()), + "lock groupId[%s] is different from task groupId[%s]", + taskLock.getGroupId(), + task.getGroupId() + ); + Preconditions.checkArgument( + task.getDataSource().equals(taskLock.getDataSource()), + "lock dataSource[%s] is different from task dataSource[%s]", + taskLock.getDataSource(), + task.getDataSource() + ); + Preconditions.checkArgument( + task.getPriority() == taskLock.getPriority(), + "lock priority[%s] is different from task priority[%s]", + taskLock.getPriority(), + task.getPriority() + ); + + return createOrFindLockPosse( + taskLock.getType(), + task.getId(), + taskLock.getGroupId(), + taskLock.getDataSource(), + taskLock.getInterval(), + taskLock.getVersion(), + taskLock.getPriority(), + taskLock.isRevoked() + ); + } + finally { + giant.unlock(); + } + } + + @Nullable + private TaskLockPosse createOrFindLockPosse( + TaskLockType lockType, + String taskId, + String groupId, + String dataSource, + Interval interval, + @Nullable String preferredVersion, + int priority, + boolean revoked + ) + { + giant.lock(); + + try { final List foundPosses = findLockPossesOverlapsInterval(dataSource, interval); if (foundPosses.size() > 0) { @@ -362,7 +427,7 @@ public class TaskLockbox // If they can't be reused, check lock priority and revoke existing locks if possible. final List filteredPosses = foundPosses .stream() - .filter(posse -> matchGroupIdAndContainInterval(posse.taskLock, task, interval)) + .filter(posse -> matchGroupIdAndContainInterval(posse.taskLock, groupId, interval)) .collect(Collectors.toList()); if (filteredPosses.size() == 0) { @@ -372,11 +437,12 @@ public class TaskLockbox // Any number of shared locks can be acquired for the same dataSource and interval. return createNewTaskLockPosse( lockType, - task.getGroupId(), + groupId, dataSource, interval, preferredVersion, - priority + priority, + revoked ); } else { if (isAllRevocable(foundPosses, priority)) { @@ -385,19 +451,38 @@ public class TaskLockbox return createNewTaskLockPosse( lockType, - task.getGroupId(), + groupId, dataSource, interval, preferredVersion, - priority + priority, + revoked ); } else { + final String messagePrefix; + if (preferredVersion == null) { + messagePrefix = StringUtils.format( + "Cannot create a new taskLockPosse for task[%s], interval[%s], priority[%d], revoked[%s]", + taskId, + interval, + priority, + revoked + ); + } else { + messagePrefix = StringUtils.format( + "Cannot create a new taskLockPosse for task[%s], interval[%s]," + + " preferredVersion[%s], priority[%d], revoked[%s]", + taskId, + interval, + preferredVersion, + priority, + revoked + ); + } + log.info( - "Cannot create a new taskLockPosse for task[%s] and interval[%s] with priority[%d]" - + " because existing locks[%s] have same or higher priorities", - task.getId(), - interval, - priority, + "%s because existing locks[%s] have same or higher priorities", + messagePrefix, foundPosses ); return null; @@ -411,7 +496,7 @@ public class TaskLockbox } else { throw new ISE( "Task[%s] already acquired a lock for interval[%s] but different type[%s]", - task.getId(), + taskId, interval, foundPosse.getTaskLock().getType() ); @@ -420,7 +505,7 @@ public class TaskLockbox // case 3) we found multiple lock posses for the given task throw new ISE( "Task group[%s] has multiple locks for the same interval[%s]?", - task.getGroupId(), + groupId, interval ); } @@ -429,11 +514,12 @@ public class TaskLockbox // Let's make a new one. return createNewTaskLockPosse( lockType, - task.getGroupId(), + groupId, dataSource, interval, preferredVersion, - priority + priority, + revoked ); } } @@ -454,6 +540,7 @@ public class TaskLockbox * @param interval interval to be locked * @param preferredVersion preferred version string * @param priority lock priority + * @param revoked indicate the lock is revoked * * @return a new {@link TaskLockPosse} */ @@ -463,7 +550,8 @@ public class TaskLockbox String dataSource, Interval interval, @Nullable String preferredVersion, - int priority + int priority, + boolean revoked ) { giant.lock(); @@ -486,7 +574,7 @@ public class TaskLockbox } final TaskLockPosse posseToUse = new TaskLockPosse( - new TaskLock(lockType, groupId, dataSource, interval, version, priority) + new TaskLock(lockType, groupId, dataSource, interval, version, priority, revoked) ); running.computeIfAbsent(dataSource, k -> new TreeMap<>(Comparators.intervalsByStartThenEnd())) .computeIfAbsent(interval, k -> new ArrayList<>()) @@ -817,10 +905,10 @@ public class TaskLockbox } } - private static boolean matchGroupIdAndContainInterval(TaskLock existingLock, Task task, Interval interval) + private static boolean matchGroupIdAndContainInterval(TaskLock existingLock, String taskGroupId, Interval interval) { return existingLock.getInterval().contains(interval) && - existingLock.getGroupId().equals(task.getGroupId()); + existingLock.getGroupId().equals(taskGroupId); } private static boolean isAllSharedLocks(List lockPosses) @@ -863,7 +951,7 @@ public class TaskLockbox } @VisibleForTesting - public Map>> getAllLocks() + Map>> getAllLocks() { return running; } @@ -921,7 +1009,7 @@ public class TaskLockbox void forEachTask(Consumer action) { - Preconditions.checkNotNull(action); + Preconditions.checkNotNull(action, "action"); taskIds.forEach(action); } diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLockboxTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLockboxTest.java index 33dd21076e2..0aa90d6401e 100644 --- a/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLockboxTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLockboxTest.java @@ -21,8 +21,6 @@ package io.druid.indexing.overlord; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.Iterables; -import io.druid.java.util.emitter.EmittingLogger; -import io.druid.java.util.emitter.service.ServiceEmitter; import io.druid.indexing.common.TaskLock; import io.druid.indexing.common.TaskLockType; import io.druid.indexing.common.TaskStatus; @@ -33,6 +31,8 @@ import io.druid.jackson.DefaultObjectMapper; import io.druid.java.util.common.ISE; import io.druid.java.util.common.Intervals; import io.druid.java.util.common.StringUtils; +import io.druid.java.util.emitter.EmittingLogger; +import io.druid.java.util.emitter.service.ServiceEmitter; import io.druid.metadata.DerbyMetadataStorageActionHandlerFactory; import io.druid.metadata.EntryExistsException; import io.druid.metadata.TestDerbyConnector; @@ -45,9 +45,11 @@ import org.junit.Test; import org.junit.rules.ExpectedException; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.stream.Collectors; @@ -259,6 +261,48 @@ public class TaskLockboxTest Assert.assertEquals(beforeLocksInStorage, afterLocksInStorage); } + @Test + public void testRevokedLockSyncFromStorage() throws EntryExistsException + { + final TaskLockbox originalBox = new TaskLockbox(taskStorage); + + final Task task1 = NoopTask.create("task1", 10); + taskStorage.insert(task1, TaskStatus.running(task1.getId())); + originalBox.add(task1); + Assert.assertTrue(originalBox.tryLock(TaskLockType.EXCLUSIVE, task1, Intervals.of("2017/2018")).isOk()); + + // task2 revokes task1 + final Task task2 = NoopTask.create("task2", 100); + taskStorage.insert(task2, TaskStatus.running(task2.getId())); + originalBox.add(task2); + Assert.assertTrue(originalBox.tryLock(TaskLockType.EXCLUSIVE, task2, Intervals.of("2017/2018")).isOk()); + + final Map> beforeLocksInStorage = taskStorage + .getActiveTasks() + .stream() + .collect(Collectors.toMap(Task::getId, task -> taskStorage.getLocks(task.getId()))); + + final List task1Locks = beforeLocksInStorage.get("task1"); + Assert.assertEquals(1, task1Locks.size()); + Assert.assertTrue(task1Locks.get(0).isRevoked()); + + final List task2Locks = beforeLocksInStorage.get("task1"); + Assert.assertEquals(1, task2Locks.size()); + Assert.assertTrue(task2Locks.get(0).isRevoked()); + + final TaskLockbox newBox = new TaskLockbox(taskStorage); + newBox.syncFromStorage(); + + final Set afterLocksInStorage = taskStorage.getActiveTasks().stream() + .flatMap(task -> taskStorage.getLocks(task.getId()).stream()) + .collect(Collectors.toSet()); + + Assert.assertEquals( + beforeLocksInStorage.values().stream().flatMap(Collection::stream).collect(Collectors.toSet()), + afterLocksInStorage + ); + } + @Test public void testDoInCriticalSectionWithSharedLock() throws Exception {