Merge pull request #1740 from metamx/validate-locks

fix #1715
This commit is contained in:
Charles Allen 2015-09-29 09:38:42 -07:00
commit d2e400f063
4 changed files with 212 additions and 35 deletions

View File

@ -30,6 +30,7 @@ import com.google.common.collect.Maps;
import com.google.common.collect.Ordering; import com.google.common.collect.Ordering;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
import com.google.inject.Inject; import com.google.inject.Inject;
import com.metamx.common.ISE;
import com.metamx.common.Pair; import com.metamx.common.Pair;
import com.metamx.common.guava.Comparators; import com.metamx.common.guava.Comparators;
import com.metamx.common.guava.FunctionalIterable; import com.metamx.common.guava.FunctionalIterable;
@ -37,10 +38,6 @@ import com.metamx.emitter.EmittingLogger;
import io.druid.common.utils.JodaUtils; import io.druid.common.utils.JodaUtils;
import io.druid.indexing.common.TaskLock; import io.druid.indexing.common.TaskLock;
import io.druid.indexing.common.task.Task; import io.druid.indexing.common.task.Task;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import javax.annotation.Nullable;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -50,9 +47,12 @@ import java.util.Set;
import java.util.TreeMap; import java.util.TreeMap;
import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantLock;
import javax.annotation.Nullable;
import org.joda.time.DateTime;
import org.joda.time.Interval;
/** /**
* Remembers which tasks have locked which intervals. Tasks are permitted to lock an interval if no other task * Remembers which activeTasks have locked which intervals. Tasks are permitted to lock an interval if no other task
* outside their group has locked an overlapping interval for the same datasource. When a task locks an interval, * outside their group has locked an overlapping interval for the same datasource. When a task locks an interval,
* it is assigned a version string that it can use to publish segments. * it is assigned a version string that it can use to publish segments.
*/ */
@ -66,6 +66,10 @@ public class TaskLockbox
private static final EmittingLogger log = new EmittingLogger(TaskLockbox.class); private static final EmittingLogger log = new EmittingLogger(TaskLockbox.class);
// Stores List of Active Tasks. TaskLockbox will only grant locks to active activeTasks.
// this set should be accessed under the giant lock.
private final Set<String> activeTasks = Sets.newHashSet();
@Inject @Inject
public TaskLockbox( public TaskLockbox(
TaskStorage taskStorage TaskStorage taskStorage
@ -103,8 +107,8 @@ public class TaskLockbox
} }
}; };
running.clear(); running.clear();
activeTasks.clear();
// Bookkeeping for a log message at the end // Bookkeeping for a log message at the end
final Set<String> uniqueTaskIds = Sets.newHashSet();
int taskLockCount = 0; int taskLockCount = 0;
for (final Pair<Task, TaskLock> taskAndLock : byVersionOrdering.sortedCopy(storedLocks)) { for (final Pair<Task, TaskLock> taskAndLock : byVersionOrdering.sortedCopy(storedLocks)) {
final Task task = taskAndLock.lhs; final Task task = taskAndLock.lhs;
@ -114,7 +118,7 @@ public class TaskLockbox
log.warn("WTF?! Got lock with empty interval for task: %s", task.getId()); log.warn("WTF?! Got lock with empty interval for task: %s", task.getId());
continue; continue;
} }
uniqueTaskIds.add(task.getId()); activeTasks.add(task.getId());
final Optional<TaskLock> acquiredTaskLock = tryLock( final Optional<TaskLock> acquiredTaskLock = tryLock(
task, task,
savedTaskLock.getInterval(), savedTaskLock.getInterval(),
@ -147,9 +151,9 @@ public class TaskLockbox
} }
} }
log.info( log.info(
"Synced %,d locks for %,d tasks from storage (%,d locks ignored).", "Synced %,d locks for %,d activeTasks from storage (%,d locks ignored).",
taskLockCount, taskLockCount,
uniqueTaskIds.size(), activeTasks.size(),
storedLocks.size() - taskLockCount storedLocks.size() - taskLockCount
); );
} finally { } finally {
@ -170,10 +174,8 @@ public class TaskLockbox
public TaskLock lock(final Task task, final Interval interval) throws InterruptedException public TaskLock lock(final Task task, final Interval interval) throws InterruptedException
{ {
giant.lock(); giant.lock();
try { try {
Optional<TaskLock> taskLock; Optional<TaskLock> taskLock;
while (!(taskLock = tryLock(task, interval)).isPresent()) { while (!(taskLock = tryLock(task, interval)).isPresent()) {
lockReleaseCondition.await(); lockReleaseCondition.await();
} }
@ -192,6 +194,7 @@ public class TaskLockbox
* @param interval interval to lock * @param interval interval to lock
* *
* @return lock version if lock was acquired, absent otherwise * @return lock version if lock was acquired, absent otherwise
* @throws IllegalStateException if the task is not a valid active task
*/ */
public Optional<TaskLock> tryLock(final Task task, final Interval interval) public Optional<TaskLock> tryLock(final Task task, final Interval interval)
{ {
@ -210,12 +213,16 @@ public class TaskLockbox
* @param preferredVersion use this version string if one has not yet been assigned * @param preferredVersion use this version string if one has not yet been assigned
* *
* @return lock version if lock was acquired, absent otherwise * @return lock version if lock was acquired, absent otherwise
* @throws IllegalStateException if the task is not a valid active task
*/ */
private Optional<TaskLock> tryLock(final Task task, final Interval interval, final Optional<String> preferredVersion) private Optional<TaskLock> tryLock(final Task task, final Interval interval, final Optional<String> preferredVersion)
{ {
giant.lock(); giant.lock();
try { try {
if(!activeTasks.contains(task.getId())){
throw new ISE("Unable to grant lock to inactive Task [%s]", task.getId());
}
Preconditions.checkArgument(interval.toDurationMillis() > 0, "interval empty"); Preconditions.checkArgument(interval.toDurationMillis() > 0, "interval empty");
final String dataSource = task.getDataSource(); final String dataSource = task.getDataSource();
final List<TaskLockPosse> foundPosses = findLockPossesForInterval(dataSource, interval); final List<TaskLockPosse> foundPosses = findLockPossesForInterval(dataSource, interval);
@ -310,13 +317,13 @@ public class TaskLockbox
try { try {
return Lists.transform( return Lists.transform(
findLockPossesForTask(task), new Function<TaskLockPosse, TaskLock>() findLockPossesForTask(task), new Function<TaskLockPosse, TaskLock>()
{ {
@Override @Override
public TaskLock apply(TaskLockPosse taskLockPosse) public TaskLock apply(TaskLockPosse taskLockPosse)
{ {
return taskLockPosse.getTaskLock(); return taskLockPosse.getTaskLock();
} }
} }
); );
} finally { } finally {
giant.unlock(); giant.unlock();
@ -338,7 +345,7 @@ public class TaskLockbox
final String dataSource = task.getDataSource(); final String dataSource = task.getDataSource();
final NavigableMap<Interval, TaskLockPosse> dsRunning = running.get(dataSource); final NavigableMap<Interval, TaskLockPosse> dsRunning = running.get(dataSource);
// So we can alert if tasks try to release stuff they don't have // So we can alert if activeTasks try to release stuff they don't have
boolean removed = false; boolean removed = false;
if(dsRunning != null) { if(dsRunning != null) {
@ -388,17 +395,22 @@ public class TaskLockbox
} }
/** /**
* Release all locks for a task. Does nothing if the task is not currently locked. * Release all locks for a task and remove task from set of active tasks. Does nothing if the task is not currently locked or not an active task.
* *
* @param task task to unlock * @param task task to unlock
*/ */
public void unlock(final Task task) public void remove(final Task task)
{ {
giant.lock(); giant.lock();
try { try {
for(final TaskLockPosse taskLockPosse : findLockPossesForTask(task)) { try {
unlock(task, taskLockPosse.getTaskLock().getInterval()); log.info("Removing task[%s] from activeTasks", task.getId());
for (final TaskLockPosse taskLockPosse : findLockPossesForTask(task)) {
unlock(task, taskLockPosse.getTaskLock().getInterval());
}
}
finally {
activeTasks.remove(task.getId());
} }
} }
finally { finally {
@ -503,6 +515,17 @@ public class TaskLockbox
} }
} }
public void add(Task task)
{
giant.lock();
try {
log.info("Adding task[%s] to activeTasks", task.getId());
activeTasks.add(task.getId());
} finally {
giant.unlock();
}
}
private static class TaskLockPosse private static class TaskLockPosse
{ {
final private TaskLock taskLock; final private TaskLock taskLock;

View File

@ -24,6 +24,7 @@ import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet; import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.collect.MapDifference;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.FutureCallback;
@ -44,6 +45,8 @@ import io.druid.indexing.overlord.config.TaskQueueConfig;
import io.druid.metadata.EntryExistsException; import io.druid.metadata.EntryExistsException;
import io.druid.query.DruidMetrics; import io.druid.query.DruidMetrics;
import java.util.Collection;
import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
@ -53,6 +56,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantLock;
import javax.annotation.Nullable;
/** /**
* Interface between task producers and the task runner. * Interface between task producers and the task runner.
@ -308,7 +312,7 @@ public class TaskQueue
// If this throws with any sort of exception, including TaskExistsException, we don't want to // If this throws with any sort of exception, including TaskExistsException, we don't want to
// insert the task into our queue. So don't catch it. // insert the task into our queue. So don't catch it.
taskStorage.insert(task, TaskStatus.running(task.getId())); taskStorage.insert(task, TaskStatus.running(task.getId()));
tasks.add(task); addTaskInternal(task);
managementMayBeNecessary.signalAll(); managementMayBeNecessary.signalAll();
return true; return true;
} }
@ -317,6 +321,18 @@ public class TaskQueue
} }
} }
// Should always be called after taking giantLock
private void addTaskInternal(final Task task){
tasks.add(task);
taskLockbox.add(task);
}
// Should always be called after taking giantLock
private void removeTaskInternal(final Task task){
taskLockbox.remove(task);
tasks.remove(task);
}
/** /**
* Shuts down a task if it has not yet finished. * Shuts down a task if it has not yet finished.
* *
@ -378,7 +394,7 @@ public class TaskQueue
for (int i = tasks.size() - 1; i >= 0; i--) { for (int i = tasks.size() - 1; i >= 0; i--) {
if (tasks.get(i).getId().equals(task.getId())) { if (tasks.get(i).getId().equals(task.getId())) {
removed++; removed++;
tasks.remove(i); removeTaskInternal(tasks.get(i));
break; break;
} }
} }
@ -397,7 +413,6 @@ public class TaskQueue
log.makeAlert("Ignoring notification for already-complete task").addData("task", task.getId()).emit(); log.makeAlert("Ignoring notification for already-complete task").addData("task", task.getId()).emit();
} else { } else {
taskStorage.setStatus(taskStatus); taskStorage.setStatus(taskStatus);
taskLockbox.unlock(task);
log.info("Task done: %s", task); log.info("Task done: %s", task);
managementMayBeNecessary.signalAll(); managementMayBeNecessary.signalAll();
} }
@ -498,15 +513,35 @@ public class TaskQueue
try { try {
if (active) { if (active) {
final List<Task> newTasks = taskStorage.getActiveTasks(); final Map<String,Task> newTasks = toTaskIDMap(taskStorage.getActiveTasks());
final int tasksSynced = newTasks.size();
final Map<String,Task> oldTasks = toTaskIDMap(tasks);
// Calculate differences on IDs instead of Task Objects.
Set<String> commonIds = Sets.newHashSet(Sets.intersection(newTasks.keySet(), oldTasks.keySet()));
for(String taskID : commonIds){
newTasks.remove(taskID);
oldTasks.remove(taskID);
}
Collection<Task> addedTasks = newTasks.values();
Collection<Task> removedTasks = oldTasks.values();
// Clean up removed Tasks
for(Task task : removedTasks){
removeTaskInternal(task);
}
// Add newly Added tasks to the queue
for(Task task : addedTasks){
addTaskInternal(task);
}
log.info( log.info(
"Synced %,d tasks from storage (%,d tasks added, %,d tasks removed).", "Synced %d tasks from storage (%d tasks added, %d tasks removed).",
newTasks.size(), tasksSynced,
Sets.difference(Sets.newHashSet(newTasks), Sets.newHashSet(tasks)).size(), addedTasks.size(),
Sets.difference(Sets.newHashSet(tasks), Sets.newHashSet(newTasks)).size() removedTasks.size()
); );
tasks.clear();
tasks.addAll(newTasks);
managementMayBeNecessary.signalAll(); managementMayBeNecessary.signalAll();
} else { } else {
log.info("Not active. Skipping storage sync."); log.info("Not active. Skipping storage sync.");
@ -520,4 +555,13 @@ public class TaskQueue
giant.unlock(); giant.unlock();
} }
} }
private static Map<String,Task> toTaskIDMap(List<Task> taskList){
Map<String,Task> rv = Maps.newHashMap();
for(Task task : taskList){
rv.put(task.getId(), task);
}
return rv;
}
} }

View File

@ -0,0 +1,100 @@
/*
* Druid - a distributed column store.
* Copyright 2012 - 2015 Metamarkets Group Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.druid.indexing.overlord;
import io.druid.indexing.common.config.TaskStorageConfig;
import io.druid.indexing.common.task.NoopTask;
import io.druid.indexing.common.task.Task;
import junit.framework.Assert;
import org.joda.time.Interval;
import org.junit.Before;
import org.junit.Test;
public class TaskLockboxTest
{
private TaskStorage taskStorage;
private TaskLockbox lockbox;
@Before
public void setUp(){
taskStorage = new HeapMemoryTaskStorage(new TaskStorageConfig(null));
lockbox = new TaskLockbox(taskStorage);
}
@Test
public void testLock() throws InterruptedException
{
Task task = NoopTask.create();
lockbox.add(task);
Assert.assertNotNull(lockbox.lock(task, new Interval("2015-01-01/2015-01-02")));
}
@Test(expected = IllegalStateException.class)
public void testLockForInactiveTask() throws InterruptedException
{
lockbox.lock(NoopTask.create(),new Interval("2015-01-01/2015-01-02"));
}
@Test(expected = IllegalStateException.class)
public void testLockAfterTaskComplete() throws InterruptedException
{
Task task = NoopTask.create();
lockbox.add(task);
lockbox.remove(task);
lockbox.lock(task, new Interval("2015-01-01/2015-01-02"));
}
@Test
public void testTryLock() throws InterruptedException
{
Task task = NoopTask.create();
lockbox.add(task);
Assert.assertTrue(lockbox.tryLock(task, new Interval("2015-01-01/2015-01-03")).isPresent());
// try to take lock for task 2 for overlapping interval
Task task2 = NoopTask.create();
lockbox.add(task2);
Assert.assertFalse(lockbox.tryLock(task2, new Interval("2015-01-01/2015-01-02")).isPresent());
// task 1 unlocks the lock
lockbox.remove(task);
// Now task2 should be able to get the lock
Assert.assertTrue(lockbox.tryLock(task2, new Interval("2015-01-01/2015-01-02")).isPresent());
}
@Test(expected = IllegalStateException.class)
public void testTryLockForInactiveTask() throws InterruptedException
{
Assert.assertFalse(lockbox.tryLock(NoopTask.create(), new Interval("2015-01-01/2015-01-02")).isPresent());
}
@Test(expected = IllegalStateException.class)
public void testTryLockAfterTaskComplete() throws InterruptedException
{
Task task = NoopTask.create();
lockbox.add(task);
lockbox.remove(task);
Assert.assertFalse(lockbox.tryLock(task, new Interval("2015-01-01/2015-01-02")).isPresent()); }
}

View File

@ -118,8 +118,17 @@ public class OverlordResourceTest
taskLockbox = EasyMock.createStrictMock(TaskLockbox.class); taskLockbox = EasyMock.createStrictMock(TaskLockbox.class);
taskLockbox.syncFromStorage(); taskLockbox.syncFromStorage();
EasyMock.expectLastCall().atLeastOnce(); EasyMock.expectLastCall().atLeastOnce();
taskLockbox.unlock(EasyMock.<Task>anyObject()); taskLockbox.add(EasyMock.<Task>anyObject());
EasyMock.expectLastCall().atLeastOnce(); EasyMock.expectLastCall().atLeastOnce();
taskLockbox.remove(EasyMock.<Task>anyObject());
EasyMock.expectLastCall().atLeastOnce();
// for second Noop Task directly added to deep storage.
taskLockbox.add(EasyMock.<Task>anyObject());
EasyMock.expectLastCall().atLeastOnce();
taskLockbox.remove(EasyMock.<Task>anyObject());
EasyMock.expectLastCall().atLeastOnce();
taskActionClientFactory = EasyMock.createStrictMock(TaskActionClientFactory.class); taskActionClientFactory = EasyMock.createStrictMock(TaskActionClientFactory.class);
EasyMock.expect(taskActionClientFactory.create(EasyMock.<Task>anyObject())) EasyMock.expect(taskActionClientFactory.create(EasyMock.<Task>anyObject()))
.andReturn(null).anyTimes(); .andReturn(null).anyTimes();
@ -226,6 +235,7 @@ public class OverlordResourceTest
waitForTaskStatus(taskId_0, TaskStatus.Status.SUCCESS); waitForTaskStatus(taskId_0, TaskStatus.Status.SUCCESS);
// Manually insert task in taskStorage // Manually insert task in taskStorage
// Verifies sync from storage
final String taskId_1 = "1"; final String taskId_1 = "1";
NoopTask task_1 = new NoopTask(taskId_1, 0, 0, null, null, null); NoopTask task_1 = new NoopTask(taskId_1, 0, 0, null, null, null);
taskStorage.insert(task_1, TaskStatus.running(taskId_1)); taskStorage.insert(task_1, TaskStatus.running(taskId_1));