diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/TaskLockbox.java b/indexing-service/src/main/java/io/druid/indexing/overlord/TaskLockbox.java index 18f11ab9ffa..81612489469 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/TaskLockbox.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/TaskLockbox.java @@ -30,6 +30,7 @@ import com.google.common.collect.Maps; import com.google.common.collect.Ordering; import com.google.common.collect.Sets; import com.google.inject.Inject; +import com.metamx.common.ISE; import com.metamx.common.Pair; import com.metamx.common.guava.Comparators; import com.metamx.common.guava.FunctionalIterable; @@ -37,10 +38,6 @@ import com.metamx.emitter.EmittingLogger; import io.druid.common.utils.JodaUtils; import io.druid.indexing.common.TaskLock; import io.druid.indexing.common.task.Task; -import org.joda.time.DateTime; -import org.joda.time.Interval; - -import javax.annotation.Nullable; import java.util.Collections; import java.util.List; import java.util.Map; @@ -50,9 +47,12 @@ import java.util.Set; import java.util.TreeMap; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; +import javax.annotation.Nullable; +import org.joda.time.DateTime; +import org.joda.time.Interval; /** - * Remembers which tasks have locked which intervals. Tasks are permitted to lock an interval if no other task + * Remembers which activeTasks have locked which intervals. Tasks are permitted to lock an interval if no other task * outside their group has locked an overlapping interval for the same datasource. When a task locks an interval, * it is assigned a version string that it can use to publish segments. */ @@ -66,6 +66,10 @@ public class TaskLockbox private static final EmittingLogger log = new EmittingLogger(TaskLockbox.class); + // Stores List of Active Tasks. TaskLockbox will only grant locks to active activeTasks. + // this set should be accessed under the giant lock. + private final Set activeTasks = Sets.newHashSet(); + @Inject public TaskLockbox( TaskStorage taskStorage @@ -103,8 +107,8 @@ public class TaskLockbox } }; running.clear(); + activeTasks.clear(); // Bookkeeping for a log message at the end - final Set uniqueTaskIds = Sets.newHashSet(); int taskLockCount = 0; for (final Pair taskAndLock : byVersionOrdering.sortedCopy(storedLocks)) { final Task task = taskAndLock.lhs; @@ -114,7 +118,7 @@ public class TaskLockbox log.warn("WTF?! Got lock with empty interval for task: %s", task.getId()); continue; } - uniqueTaskIds.add(task.getId()); + activeTasks.add(task.getId()); final Optional acquiredTaskLock = tryLock( task, savedTaskLock.getInterval(), @@ -147,9 +151,9 @@ public class TaskLockbox } } log.info( - "Synced %,d locks for %,d tasks from storage (%,d locks ignored).", + "Synced %,d locks for %,d activeTasks from storage (%,d locks ignored).", taskLockCount, - uniqueTaskIds.size(), + activeTasks.size(), storedLocks.size() - taskLockCount ); } finally { @@ -170,10 +174,8 @@ public class TaskLockbox public TaskLock lock(final Task task, final Interval interval) throws InterruptedException { giant.lock(); - try { Optional taskLock; - while (!(taskLock = tryLock(task, interval)).isPresent()) { lockReleaseCondition.await(); } @@ -192,6 +194,7 @@ public class TaskLockbox * @param interval interval to lock * * @return lock version if lock was acquired, absent otherwise + * @throws IllegalStateException if the task is not a valid active task */ public Optional tryLock(final Task task, final Interval interval) { @@ -210,12 +213,16 @@ public class TaskLockbox * @param preferredVersion use this version string if one has not yet been assigned * * @return lock version if lock was acquired, absent otherwise + * @throws IllegalStateException if the task is not a valid active task */ private Optional tryLock(final Task task, final Interval interval, final Optional preferredVersion) { giant.lock(); try { + if(!activeTasks.contains(task.getId())){ + throw new ISE("Unable to grant lock to inactive Task [%s]", task.getId()); + } Preconditions.checkArgument(interval.toDurationMillis() > 0, "interval empty"); final String dataSource = task.getDataSource(); final List foundPosses = findLockPossesForInterval(dataSource, interval); @@ -310,13 +317,13 @@ public class TaskLockbox try { return Lists.transform( findLockPossesForTask(task), new Function() - { - @Override - public TaskLock apply(TaskLockPosse taskLockPosse) - { - return taskLockPosse.getTaskLock(); - } - } + { + @Override + public TaskLock apply(TaskLockPosse taskLockPosse) + { + return taskLockPosse.getTaskLock(); + } + } ); } finally { giant.unlock(); @@ -338,7 +345,7 @@ public class TaskLockbox final String dataSource = task.getDataSource(); final NavigableMap dsRunning = running.get(dataSource); - // So we can alert if tasks try to release stuff they don't have + // So we can alert if activeTasks try to release stuff they don't have boolean removed = false; if(dsRunning != null) { @@ -388,17 +395,22 @@ public class TaskLockbox } /** - * Release all locks for a task. Does nothing if the task is not currently locked. + * Release all locks for a task and remove task from set of active tasks. Does nothing if the task is not currently locked or not an active task. * * @param task task to unlock */ - public void unlock(final Task task) + public void remove(final Task task) { giant.lock(); - try { - for(final TaskLockPosse taskLockPosse : findLockPossesForTask(task)) { - unlock(task, taskLockPosse.getTaskLock().getInterval()); + try { + log.info("Removing task[%s] from activeTasks", task.getId()); + for (final TaskLockPosse taskLockPosse : findLockPossesForTask(task)) { + unlock(task, taskLockPosse.getTaskLock().getInterval()); + } + } + finally { + activeTasks.remove(task.getId()); } } finally { @@ -503,6 +515,17 @@ public class TaskLockbox } } + public void add(Task task) + { + giant.lock(); + try { + log.info("Adding task[%s] to activeTasks", task.getId()); + activeTasks.add(task.getId()); + } finally { + giant.unlock(); + } + } + private static class TaskLockPosse { final private TaskLock taskLock; diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/TaskQueue.java b/indexing-service/src/main/java/io/druid/indexing/overlord/TaskQueue.java index 61b60644cfe..682f72c63b6 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/TaskQueue.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/TaskQueue.java @@ -24,6 +24,7 @@ import com.google.common.base.Throwables; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Lists; +import com.google.common.collect.MapDifference; import com.google.common.collect.Maps; import com.google.common.collect.Sets; import com.google.common.util.concurrent.FutureCallback; @@ -44,6 +45,8 @@ import io.druid.indexing.overlord.config.TaskQueueConfig; import io.druid.metadata.EntryExistsException; import io.druid.query.DruidMetrics; +import java.util.Collection; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -53,6 +56,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; +import javax.annotation.Nullable; /** * Interface between task producers and the task runner. @@ -308,7 +312,7 @@ public class TaskQueue // If this throws with any sort of exception, including TaskExistsException, we don't want to // insert the task into our queue. So don't catch it. taskStorage.insert(task, TaskStatus.running(task.getId())); - tasks.add(task); + addTaskInternal(task); managementMayBeNecessary.signalAll(); return true; } @@ -317,6 +321,18 @@ public class TaskQueue } } + // Should always be called after taking giantLock + private void addTaskInternal(final Task task){ + tasks.add(task); + taskLockbox.add(task); + } + + // Should always be called after taking giantLock + private void removeTaskInternal(final Task task){ + taskLockbox.remove(task); + tasks.remove(task); + } + /** * Shuts down a task if it has not yet finished. * @@ -378,7 +394,7 @@ public class TaskQueue for (int i = tasks.size() - 1; i >= 0; i--) { if (tasks.get(i).getId().equals(task.getId())) { removed++; - tasks.remove(i); + removeTaskInternal(tasks.get(i)); break; } } @@ -397,7 +413,6 @@ public class TaskQueue log.makeAlert("Ignoring notification for already-complete task").addData("task", task.getId()).emit(); } else { taskStorage.setStatus(taskStatus); - taskLockbox.unlock(task); log.info("Task done: %s", task); managementMayBeNecessary.signalAll(); } @@ -498,15 +513,35 @@ public class TaskQueue try { if (active) { - final List newTasks = taskStorage.getActiveTasks(); + final Map newTasks = toTaskIDMap(taskStorage.getActiveTasks()); + final int tasksSynced = newTasks.size(); + final Map oldTasks = toTaskIDMap(tasks); + + // Calculate differences on IDs instead of Task Objects. + Set commonIds = Sets.newHashSet(Sets.intersection(newTasks.keySet(), oldTasks.keySet())); + for(String taskID : commonIds){ + newTasks.remove(taskID); + oldTasks.remove(taskID); + } + Collection addedTasks = newTasks.values(); + Collection removedTasks = oldTasks.values(); + + // Clean up removed Tasks + for(Task task : removedTasks){ + removeTaskInternal(task); + } + + // Add newly Added tasks to the queue + for(Task task : addedTasks){ + addTaskInternal(task); + } + log.info( - "Synced %,d tasks from storage (%,d tasks added, %,d tasks removed).", - newTasks.size(), - Sets.difference(Sets.newHashSet(newTasks), Sets.newHashSet(tasks)).size(), - Sets.difference(Sets.newHashSet(tasks), Sets.newHashSet(newTasks)).size() + "Synced %d tasks from storage (%d tasks added, %d tasks removed).", + tasksSynced, + addedTasks.size(), + removedTasks.size() ); - tasks.clear(); - tasks.addAll(newTasks); managementMayBeNecessary.signalAll(); } else { log.info("Not active. Skipping storage sync."); @@ -520,4 +555,13 @@ public class TaskQueue giant.unlock(); } } + + private static Map toTaskIDMap(List taskList){ + Map rv = Maps.newHashMap(); + for(Task task : taskList){ + rv.put(task.getId(), task); + } + return rv; + } + } diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLockboxTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLockboxTest.java new file mode 100644 index 00000000000..bca630e2957 --- /dev/null +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLockboxTest.java @@ -0,0 +1,100 @@ +/* + * Druid - a distributed column store. + * Copyright 2012 - 2015 Metamarkets Group Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.druid.indexing.overlord; + + +import io.druid.indexing.common.config.TaskStorageConfig; +import io.druid.indexing.common.task.NoopTask; +import io.druid.indexing.common.task.Task; +import junit.framework.Assert; +import org.joda.time.Interval; +import org.junit.Before; +import org.junit.Test; + +public class TaskLockboxTest +{ + private TaskStorage taskStorage; + + private TaskLockbox lockbox; + + @Before + public void setUp(){ + taskStorage = new HeapMemoryTaskStorage(new TaskStorageConfig(null)); + lockbox = new TaskLockbox(taskStorage); + } + + @Test + public void testLock() throws InterruptedException + { + Task task = NoopTask.create(); + lockbox.add(task); + Assert.assertNotNull(lockbox.lock(task, new Interval("2015-01-01/2015-01-02"))); + } + + @Test(expected = IllegalStateException.class) + public void testLockForInactiveTask() throws InterruptedException + { + lockbox.lock(NoopTask.create(),new Interval("2015-01-01/2015-01-02")); + } + + @Test(expected = IllegalStateException.class) + public void testLockAfterTaskComplete() throws InterruptedException + { + Task task = NoopTask.create(); + lockbox.add(task); + lockbox.remove(task); + lockbox.lock(task, new Interval("2015-01-01/2015-01-02")); + } + + @Test + public void testTryLock() throws InterruptedException + { + Task task = NoopTask.create(); + lockbox.add(task); + Assert.assertTrue(lockbox.tryLock(task, new Interval("2015-01-01/2015-01-03")).isPresent()); + + // try to take lock for task 2 for overlapping interval + Task task2 = NoopTask.create(); + lockbox.add(task2); + Assert.assertFalse(lockbox.tryLock(task2, new Interval("2015-01-01/2015-01-02")).isPresent()); + + // task 1 unlocks the lock + lockbox.remove(task); + + // Now task2 should be able to get the lock + Assert.assertTrue(lockbox.tryLock(task2, new Interval("2015-01-01/2015-01-02")).isPresent()); + + } + + @Test(expected = IllegalStateException.class) + public void testTryLockForInactiveTask() throws InterruptedException + { + Assert.assertFalse(lockbox.tryLock(NoopTask.create(), new Interval("2015-01-01/2015-01-02")).isPresent()); + } + + @Test(expected = IllegalStateException.class) + public void testTryLockAfterTaskComplete() throws InterruptedException + { + Task task = NoopTask.create(); + lockbox.add(task); + lockbox.remove(task); + Assert.assertFalse(lockbox.tryLock(task, new Interval("2015-01-01/2015-01-02")).isPresent()); } + + + +} diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/http/OverlordResourceTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/http/OverlordResourceTest.java index f455bbe015f..ec20523fe7b 100644 --- a/indexing-service/src/test/java/io/druid/indexing/overlord/http/OverlordResourceTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/http/OverlordResourceTest.java @@ -118,8 +118,17 @@ public class OverlordResourceTest taskLockbox = EasyMock.createStrictMock(TaskLockbox.class); taskLockbox.syncFromStorage(); EasyMock.expectLastCall().atLeastOnce(); - taskLockbox.unlock(EasyMock.anyObject()); + taskLockbox.add(EasyMock.anyObject()); EasyMock.expectLastCall().atLeastOnce(); + taskLockbox.remove(EasyMock.anyObject()); + EasyMock.expectLastCall().atLeastOnce(); + + // for second Noop Task directly added to deep storage. + taskLockbox.add(EasyMock.anyObject()); + EasyMock.expectLastCall().atLeastOnce(); + taskLockbox.remove(EasyMock.anyObject()); + EasyMock.expectLastCall().atLeastOnce(); + taskActionClientFactory = EasyMock.createStrictMock(TaskActionClientFactory.class); EasyMock.expect(taskActionClientFactory.create(EasyMock.anyObject())) .andReturn(null).anyTimes(); @@ -226,6 +235,7 @@ public class OverlordResourceTest waitForTaskStatus(taskId_0, TaskStatus.Status.SUCCESS); // Manually insert task in taskStorage + // Verifies sync from storage final String taskId_1 = "1"; NoopTask task_1 = new NoopTask(taskId_1, 0, 0, null, null, null); taskStorage.insert(task_1, TaskStatus.running(taskId_1));