fixes #1715
- TaskLockBox has a set of active tasks
- lock requests throws exception for if they are from a task not in
active task set.
- TaskQueue is responsible for updating the active task set on
tasklockbox

fix #1715

fixes #1715
- TaskLockBox has a set of active tasks
- lock requests throws exception for if they are from a task not in
active task set.
- TaskQueue is responsible for updating the active task set on
tasklockbox

review comment

remove duplicate line

use ISE instead

organise imports
This commit is contained in:
Nishant 2015-09-17 02:03:08 +05:30
parent 35caa753aa
commit b638400acb
4 changed files with 212 additions and 35 deletions

View File

@ -30,6 +30,7 @@ import com.google.common.collect.Maps;
import com.google.common.collect.Ordering;
import com.google.common.collect.Sets;
import com.google.inject.Inject;
import com.metamx.common.ISE;
import com.metamx.common.Pair;
import com.metamx.common.guava.Comparators;
import com.metamx.common.guava.FunctionalIterable;
@ -37,10 +38,6 @@ import com.metamx.emitter.EmittingLogger;
import io.druid.common.utils.JodaUtils;
import io.druid.indexing.common.TaskLock;
import io.druid.indexing.common.task.Task;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import javax.annotation.Nullable;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@ -50,9 +47,12 @@ import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import javax.annotation.Nullable;
import org.joda.time.DateTime;
import org.joda.time.Interval;
/**
* Remembers which tasks have locked which intervals. Tasks are permitted to lock an interval if no other task
* Remembers which activeTasks have locked which intervals. Tasks are permitted to lock an interval if no other task
* outside their group has locked an overlapping interval for the same datasource. When a task locks an interval,
* it is assigned a version string that it can use to publish segments.
*/
@ -66,6 +66,10 @@ public class TaskLockbox
private static final EmittingLogger log = new EmittingLogger(TaskLockbox.class);
// Stores List of Active Tasks. TaskLockbox will only grant locks to active activeTasks.
// this set should be accessed under the giant lock.
private final Set<String> activeTasks = Sets.newHashSet();
@Inject
public TaskLockbox(
TaskStorage taskStorage
@ -103,8 +107,8 @@ public class TaskLockbox
}
};
running.clear();
activeTasks.clear();
// Bookkeeping for a log message at the end
final Set<String> uniqueTaskIds = Sets.newHashSet();
int taskLockCount = 0;
for (final Pair<Task, TaskLock> taskAndLock : byVersionOrdering.sortedCopy(storedLocks)) {
final Task task = taskAndLock.lhs;
@ -114,7 +118,7 @@ public class TaskLockbox
log.warn("WTF?! Got lock with empty interval for task: %s", task.getId());
continue;
}
uniqueTaskIds.add(task.getId());
activeTasks.add(task.getId());
final Optional<TaskLock> acquiredTaskLock = tryLock(
task,
savedTaskLock.getInterval(),
@ -147,9 +151,9 @@ public class TaskLockbox
}
}
log.info(
"Synced %,d locks for %,d tasks from storage (%,d locks ignored).",
"Synced %,d locks for %,d activeTasks from storage (%,d locks ignored).",
taskLockCount,
uniqueTaskIds.size(),
activeTasks.size(),
storedLocks.size() - taskLockCount
);
} finally {
@ -170,10 +174,8 @@ public class TaskLockbox
public TaskLock lock(final Task task, final Interval interval) throws InterruptedException
{
giant.lock();
try {
Optional<TaskLock> taskLock;
while (!(taskLock = tryLock(task, interval)).isPresent()) {
lockReleaseCondition.await();
}
@ -192,6 +194,7 @@ public class TaskLockbox
* @param interval interval to lock
*
* @return lock version if lock was acquired, absent otherwise
* @throws IllegalStateException if the task is not a valid active task
*/
public Optional<TaskLock> tryLock(final Task task, final Interval interval)
{
@ -210,12 +213,16 @@ public class TaskLockbox
* @param preferredVersion use this version string if one has not yet been assigned
*
* @return lock version if lock was acquired, absent otherwise
* @throws IllegalStateException if the task is not a valid active task
*/
private Optional<TaskLock> tryLock(final Task task, final Interval interval, final Optional<String> preferredVersion)
{
giant.lock();
try {
if(!activeTasks.contains(task.getId())){
throw new ISE("Unable to grant lock to inactive Task [%s]", task.getId());
}
Preconditions.checkArgument(interval.toDurationMillis() > 0, "interval empty");
final String dataSource = task.getDataSource();
final List<TaskLockPosse> foundPosses = findLockPossesForInterval(dataSource, interval);
@ -338,7 +345,7 @@ public class TaskLockbox
final String dataSource = task.getDataSource();
final NavigableMap<Interval, TaskLockPosse> dsRunning = running.get(dataSource);
// So we can alert if tasks try to release stuff they don't have
// So we can alert if activeTasks try to release stuff they don't have
boolean removed = false;
if(dsRunning != null) {
@ -388,19 +395,24 @@ public class TaskLockbox
}
/**
* Release all locks for a task. Does nothing if the task is not currently locked.
* Release all locks for a task and remove task from set of active tasks. Does nothing if the task is not currently locked or not an active task.
*
* @param task task to unlock
*/
public void unlock(final Task task)
public void remove(final Task task)
{
giant.lock();
try {
for(final TaskLockPosse taskLockPosse : findLockPossesForTask(task)) {
try {
log.info("Removing task[%s] from activeTasks", task.getId());
for (final TaskLockPosse taskLockPosse : findLockPossesForTask(task)) {
unlock(task, taskLockPosse.getTaskLock().getInterval());
}
}
finally {
activeTasks.remove(task.getId());
}
}
finally {
giant.unlock();
}
@ -503,6 +515,17 @@ public class TaskLockbox
}
}
public void add(Task task)
{
giant.lock();
try {
log.info("Adding task[%s] to activeTasks", task.getId());
activeTasks.add(task.getId());
} finally {
giant.unlock();
}
}
private static class TaskLockPosse
{
final private TaskLock taskLock;

View File

@ -24,6 +24,7 @@ import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import com.google.common.collect.MapDifference;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.FutureCallback;
@ -44,6 +45,8 @@ import io.druid.indexing.overlord.config.TaskQueueConfig;
import io.druid.metadata.EntryExistsException;
import io.druid.query.DruidMetrics;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@ -53,6 +56,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import javax.annotation.Nullable;
/**
* Interface between task producers and the task runner.
@ -308,7 +312,7 @@ public class TaskQueue
// If this throws with any sort of exception, including TaskExistsException, we don't want to
// insert the task into our queue. So don't catch it.
taskStorage.insert(task, TaskStatus.running(task.getId()));
tasks.add(task);
addTaskInternal(task);
managementMayBeNecessary.signalAll();
return true;
}
@ -317,6 +321,18 @@ public class TaskQueue
}
}
// Should always be called after taking giantLock
private void addTaskInternal(final Task task){
tasks.add(task);
taskLockbox.add(task);
}
// Should always be called after taking giantLock
private void removeTaskInternal(final Task task){
taskLockbox.remove(task);
tasks.remove(task);
}
/**
* Shuts down a task if it has not yet finished.
*
@ -378,7 +394,7 @@ public class TaskQueue
for (int i = tasks.size() - 1; i >= 0; i--) {
if (tasks.get(i).getId().equals(task.getId())) {
removed++;
tasks.remove(i);
removeTaskInternal(tasks.get(i));
break;
}
}
@ -397,7 +413,6 @@ public class TaskQueue
log.makeAlert("Ignoring notification for already-complete task").addData("task", task.getId()).emit();
} else {
taskStorage.setStatus(taskStatus);
taskLockbox.unlock(task);
log.info("Task done: %s", task);
managementMayBeNecessary.signalAll();
}
@ -498,15 +513,35 @@ public class TaskQueue
try {
if (active) {
final List<Task> newTasks = taskStorage.getActiveTasks();
final Map<String,Task> newTasks = toTaskIDMap(taskStorage.getActiveTasks());
final int tasksSynced = newTasks.size();
final Map<String,Task> oldTasks = toTaskIDMap(tasks);
// Calculate differences on IDs instead of Task Objects.
Set<String> commonIds = Sets.newHashSet(Sets.intersection(newTasks.keySet(), oldTasks.keySet()));
for(String taskID : commonIds){
newTasks.remove(taskID);
oldTasks.remove(taskID);
}
Collection<Task> addedTasks = newTasks.values();
Collection<Task> removedTasks = oldTasks.values();
// Clean up removed Tasks
for(Task task : removedTasks){
removeTaskInternal(task);
}
// Add newly Added tasks to the queue
for(Task task : addedTasks){
addTaskInternal(task);
}
log.info(
"Synced %,d tasks from storage (%,d tasks added, %,d tasks removed).",
newTasks.size(),
Sets.difference(Sets.newHashSet(newTasks), Sets.newHashSet(tasks)).size(),
Sets.difference(Sets.newHashSet(tasks), Sets.newHashSet(newTasks)).size()
"Synced %d tasks from storage (%d tasks added, %d tasks removed).",
tasksSynced,
addedTasks.size(),
removedTasks.size()
);
tasks.clear();
tasks.addAll(newTasks);
managementMayBeNecessary.signalAll();
} else {
log.info("Not active. Skipping storage sync.");
@ -520,4 +555,13 @@ public class TaskQueue
giant.unlock();
}
}
private static Map<String,Task> toTaskIDMap(List<Task> taskList){
Map<String,Task> rv = Maps.newHashMap();
for(Task task : taskList){
rv.put(task.getId(), task);
}
return rv;
}
}

View File

@ -0,0 +1,100 @@
/*
* Druid - a distributed column store.
* Copyright 2012 - 2015 Metamarkets Group Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.druid.indexing.overlord;
import io.druid.indexing.common.config.TaskStorageConfig;
import io.druid.indexing.common.task.NoopTask;
import io.druid.indexing.common.task.Task;
import junit.framework.Assert;
import org.joda.time.Interval;
import org.junit.Before;
import org.junit.Test;
public class TaskLockboxTest
{
private TaskStorage taskStorage;
private TaskLockbox lockbox;
@Before
public void setUp(){
taskStorage = new HeapMemoryTaskStorage(new TaskStorageConfig(null));
lockbox = new TaskLockbox(taskStorage);
}
@Test
public void testLock() throws InterruptedException
{
Task task = NoopTask.create();
lockbox.add(task);
Assert.assertNotNull(lockbox.lock(task, new Interval("2015-01-01/2015-01-02")));
}
@Test(expected = IllegalStateException.class)
public void testLockForInactiveTask() throws InterruptedException
{
lockbox.lock(NoopTask.create(),new Interval("2015-01-01/2015-01-02"));
}
@Test(expected = IllegalStateException.class)
public void testLockAfterTaskComplete() throws InterruptedException
{
Task task = NoopTask.create();
lockbox.add(task);
lockbox.remove(task);
lockbox.lock(task, new Interval("2015-01-01/2015-01-02"));
}
@Test
public void testTryLock() throws InterruptedException
{
Task task = NoopTask.create();
lockbox.add(task);
Assert.assertTrue(lockbox.tryLock(task, new Interval("2015-01-01/2015-01-03")).isPresent());
// try to take lock for task 2 for overlapping interval
Task task2 = NoopTask.create();
lockbox.add(task2);
Assert.assertFalse(lockbox.tryLock(task2, new Interval("2015-01-01/2015-01-02")).isPresent());
// task 1 unlocks the lock
lockbox.remove(task);
// Now task2 should be able to get the lock
Assert.assertTrue(lockbox.tryLock(task2, new Interval("2015-01-01/2015-01-02")).isPresent());
}
@Test(expected = IllegalStateException.class)
public void testTryLockForInactiveTask() throws InterruptedException
{
Assert.assertFalse(lockbox.tryLock(NoopTask.create(), new Interval("2015-01-01/2015-01-02")).isPresent());
}
@Test(expected = IllegalStateException.class)
public void testTryLockAfterTaskComplete() throws InterruptedException
{
Task task = NoopTask.create();
lockbox.add(task);
lockbox.remove(task);
Assert.assertFalse(lockbox.tryLock(task, new Interval("2015-01-01/2015-01-02")).isPresent()); }
}

View File

@ -118,8 +118,17 @@ public class OverlordResourceTest
taskLockbox = EasyMock.createStrictMock(TaskLockbox.class);
taskLockbox.syncFromStorage();
EasyMock.expectLastCall().atLeastOnce();
taskLockbox.unlock(EasyMock.<Task>anyObject());
taskLockbox.add(EasyMock.<Task>anyObject());
EasyMock.expectLastCall().atLeastOnce();
taskLockbox.remove(EasyMock.<Task>anyObject());
EasyMock.expectLastCall().atLeastOnce();
// for second Noop Task directly added to deep storage.
taskLockbox.add(EasyMock.<Task>anyObject());
EasyMock.expectLastCall().atLeastOnce();
taskLockbox.remove(EasyMock.<Task>anyObject());
EasyMock.expectLastCall().atLeastOnce();
taskActionClientFactory = EasyMock.createStrictMock(TaskActionClientFactory.class);
EasyMock.expect(taskActionClientFactory.create(EasyMock.<Task>anyObject()))
.andReturn(null).anyTimes();
@ -226,6 +235,7 @@ public class OverlordResourceTest
waitForTaskStatus(taskId_0, TaskStatus.Status.SUCCESS);
// Manually insert task in taskStorage
// Verifies sync from storage
final String taskId_1 = "1";
NoopTask task_1 = new NoopTask(taskId_1, 0, 0, null, null, null);
taskStorage.insert(task_1, TaskStatus.running(taskId_1));