diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/HeapMemoryTaskStorage.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/HeapMemoryTaskStorage.java index be43cc1e3e6..d949faa7638 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/HeapMemoryTaskStorage.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/HeapMemoryTaskStorage.java @@ -26,6 +26,7 @@ import com.google.common.collect.HashMultimap; import com.google.common.collect.ImmutableList; import com.google.common.collect.Multimap; import com.google.common.collect.Ordering; +import com.google.errorprone.annotations.concurrent.GuardedBy; import com.google.inject.Inject; import org.apache.druid.indexer.TaskInfo; import org.apache.druid.indexer.TaskStatus; @@ -40,23 +41,25 @@ import org.joda.time.DateTime; import org.joda.time.Duration; import javax.annotation.Nullable; -import java.util.HashMap; +import java.util.Collections; import java.util.List; import java.util.Map; -import java.util.concurrent.locks.ReentrantLock; +import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; /** - * Implements an in-heap TaskStorage facility, with no persistence across restarts. This class is not - * thread safe. + * Implements an in-heap TaskStorage facility, with no persistence across restarts. This class + * is thread safe. */ public class HeapMemoryTaskStorage implements TaskStorage { private final TaskStorageConfig config; - private final ReentrantLock giant = new ReentrantLock(); - private final Map tasks = new HashMap<>(); + private final ConcurrentHashMap tasks = new ConcurrentHashMap<>(); + + @GuardedBy("itself") private final Multimap taskLocks = HashMultimap.create(); + @GuardedBy("itself") private final Multimap taskActions = ArrayListMultimap.create(); private static final Logger log = new Logger(HeapMemoryTaskStorage.class); @@ -70,82 +73,59 @@ public class HeapMemoryTaskStorage implements TaskStorage @Override public void insert(Task task, TaskStatus status) throws EntryExistsException { - giant.lock(); + Preconditions.checkNotNull(task, "task"); + Preconditions.checkNotNull(status, "status"); + Preconditions.checkArgument( + task.getId().equals(status.getId()), + "Task/Status ID mismatch[%s/%s]", + task.getId(), + status.getId() + ); - try { - Preconditions.checkNotNull(task, "task"); - Preconditions.checkNotNull(status, "status"); - Preconditions.checkArgument( - task.getId().equals(status.getId()), - "Task/Status ID mismatch[%s/%s]", - task.getId(), - status.getId() - ); - - if (tasks.containsKey(task.getId())) { - throw new EntryExistsException(task.getId()); - } - - log.info("Inserting task %s with status: %s", task.getId(), status); - tasks.put(task.getId(), new TaskStuff(task, status, DateTimes.nowUtc(), task.getDataSource())); - } - finally { - giant.unlock(); + TaskStuff newTaskStuff = new TaskStuff(task, status, DateTimes.nowUtc(), task.getDataSource()); + TaskStuff alreadyExisted = tasks.putIfAbsent(task.getId(), newTaskStuff); + if (alreadyExisted != null) { + throw new EntryExistsException(task.getId()); } + + log.info("Inserted task %s with status: %s", task.getId(), status); } @Override public Optional getTask(String taskid) { - giant.lock(); - - try { - Preconditions.checkNotNull(taskid, "taskid"); - if (tasks.containsKey(taskid)) { - return Optional.of(tasks.get(taskid).getTask()); - } else { - return Optional.absent(); - } - } - finally { - giant.unlock(); + Preconditions.checkNotNull(taskid, "taskid"); + TaskStuff taskStuff = tasks.get(taskid); + if (taskStuff != null) { + return Optional.of(taskStuff.getTask()); + } else { + return Optional.absent(); } } @Override public void setStatus(TaskStatus status) { - giant.lock(); + Preconditions.checkNotNull(status, "status"); + final String taskid = status.getId(); - try { - Preconditions.checkNotNull(status, "status"); - - final String taskid = status.getId(); - Preconditions.checkState(tasks.containsKey(taskid), "Task ID must already be present: %s", taskid); - Preconditions.checkState(tasks.get(taskid).getStatus().isRunnable(), "Task status must be runnable: %s", taskid); - log.info("Updating task %s to status: %s", taskid, status); - tasks.put(taskid, tasks.get(taskid).withStatus(status)); - } - finally { - giant.unlock(); - } + log.info("Updating task %s to status: %s", taskid, status); + TaskStuff updated = tasks.computeIfPresent(taskid, (tid, taskStuff) -> { + Preconditions.checkState(taskStuff.getStatus().isRunnable(), "Task must be runnable: %s", taskid); + return taskStuff.withStatus(status); + }); + Preconditions.checkNotNull(updated, "Task ID must already be present: %s", taskid); } @Override public Optional getStatus(String taskid) { - giant.lock(); - - try { - Preconditions.checkNotNull(taskid, "taskid"); - if (tasks.containsKey(taskid)) { - return Optional.of(tasks.get(taskid).getStatus()); - } else { - return Optional.absent(); - } - } - finally { - giant.unlock(); + Preconditions.checkNotNull(taskid, "taskid"); + TaskStuff existing = tasks.get(taskid); + if (existing != null) { + return Optional.of(existing.getStatus()); + } else { + return Optional.absent(); } } @@ -153,90 +133,50 @@ public class HeapMemoryTaskStorage implements TaskStorage @Override public TaskInfo getTaskInfo(String taskId) { - giant.lock(); - - try { - Preconditions.checkNotNull(taskId, "taskId"); - final TaskStuff taskStuff = tasks.get(taskId); - if (taskStuff != null) { - return new TaskInfo<>( - taskStuff.getTask().getId(), - taskStuff.getCreatedDate(), - taskStuff.getStatus(), - taskStuff.getDataSource(), - taskStuff.getTask() - ); - } else { - return null; - } - } - finally { - giant.unlock(); + Preconditions.checkNotNull(taskId, "taskId"); + final TaskStuff taskStuff = tasks.get(taskId); + if (taskStuff != null) { + return TaskStuff.toTaskInfo(taskStuff); + } else { + return null; } } @Override public List getActiveTasks() { - giant.lock(); - - try { - final ImmutableList.Builder listBuilder = ImmutableList.builder(); - for (final TaskStuff taskStuff : tasks.values()) { - if (taskStuff.getStatus().isRunnable()) { - listBuilder.add(taskStuff.getTask()); - } + final ImmutableList.Builder listBuilder = ImmutableList.builder(); + for (final TaskStuff taskStuff : tasks.values()) { + if (taskStuff.getStatus().isRunnable()) { + listBuilder.add(taskStuff.getTask()); } - return listBuilder.build(); - } - finally { - giant.unlock(); } + return listBuilder.build(); } @Override public List getActiveTasksByDatasource(String datasource) { - giant.lock(); - - try { - final ImmutableList.Builder listBuilder = ImmutableList.builder(); - for (Map.Entry entry : tasks.entrySet()) { - if (entry.getValue().getStatus().isRunnable() && entry.getValue().getDataSource().equals(datasource)) { - listBuilder.add(entry.getValue().getTask()); - } + final ImmutableList.Builder listBuilder = ImmutableList.builder(); + for (Map.Entry entry : tasks.entrySet()) { + if (entry.getValue().getStatus().isRunnable() && entry.getValue().getDataSource().equals(datasource)) { + listBuilder.add(entry.getValue().getTask()); } - return listBuilder.build(); - } - finally { - giant.unlock(); } + return listBuilder.build(); } @Override public List> getActiveTaskInfo(@Nullable String dataSource) { - giant.lock(); - - try { - final ImmutableList.Builder> listBuilder = ImmutableList.builder(); - for (final TaskStuff taskStuff : tasks.values()) { - if (taskStuff.getStatus().isRunnable()) { - TaskInfo t = new TaskInfo<>( - taskStuff.getTask().getId(), - taskStuff.getCreatedDate(), - taskStuff.getStatus(), - taskStuff.getDataSource(), - taskStuff.getTask() - ); - listBuilder.add(t); - } + final ImmutableList.Builder> listBuilder = ImmutableList.builder(); + for (final TaskStuff taskStuff : tasks.values()) { + if (taskStuff.getStatus().isRunnable()) { + TaskInfo t = TaskStuff.toTaskInfo(taskStuff); + listBuilder.add(t); } - return listBuilder.build(); - } - finally { - giant.unlock(); } + return listBuilder.build(); } @Override @@ -246,29 +186,22 @@ public class HeapMemoryTaskStorage implements TaskStorage @Nullable String datasource ) { - giant.lock(); - - try { - final Ordering createdDateDesc = new Ordering() + final Ordering createdDateDesc = new Ordering() + { + @Override + public int compare(TaskStuff a, TaskStuff b) { - @Override - public int compare(TaskStuff a, TaskStuff b) - { - return a.getCreatedDate().compareTo(b.getCreatedDate()); - } - }.reverse(); + return a.getCreatedDate().compareTo(b.getCreatedDate()); + } + }.reverse(); - return maxTaskStatuses == null ? - getRecentlyCreatedAlreadyFinishedTaskInfoSince( - DateTimes.nowUtc() - .minus(durationBeforeNow == null ? config.getRecentlyFinishedThreshold() : durationBeforeNow), - createdDateDesc - ) : - getNRecentlyCreatedAlreadyFinishedTaskInfo(maxTaskStatuses, createdDateDesc); - } - finally { - giant.unlock(); - } + return maxTaskStatuses == null ? + getRecentlyCreatedAlreadyFinishedTaskInfoSince( + DateTimes.nowUtc() + .minus(durationBeforeNow == null ? config.getRecentlyFinishedThreshold() : durationBeforeNow), + createdDateDesc + ) : + getNRecentlyCreatedAlreadyFinishedTaskInfo(maxTaskStatuses, createdDateDesc); } private List> getRecentlyCreatedAlreadyFinishedTaskInfoSince( @@ -276,31 +209,13 @@ public class HeapMemoryTaskStorage implements TaskStorage Ordering createdDateDesc ) { - giant.lock(); - - try { - List list = createdDateDesc - .sortedCopy(tasks.values()) - .stream() - .filter(taskStuff -> taskStuff.getStatus().isComplete() && taskStuff.createdDate.isAfter(start)) - .collect(Collectors.toList()); - final ImmutableList.Builder> listBuilder = ImmutableList.builder(); - for (final TaskStuff taskStuff : list) { - String id = taskStuff.getTask().getId(); - TaskInfo t = new TaskInfo<>( - id, - taskStuff.getCreatedDate(), - taskStuff.getStatus(), - taskStuff.getDataSource(), - taskStuff.getTask() - ); - listBuilder.add(t); - } - return listBuilder.build(); - } - finally { - giant.unlock(); - } + List> list = tasks.values() + .stream() + .filter(taskStuff -> taskStuff.getStatus().isComplete() && taskStuff.getCreatedDate().isAfter(start)) + .sorted(createdDateDesc) + .map(TaskStuff::toTaskInfo) + .collect(Collectors.toList()); + return Collections.unmodifiableList(list); } private List> getNRecentlyCreatedAlreadyFinishedTaskInfo( @@ -308,114 +223,77 @@ public class HeapMemoryTaskStorage implements TaskStorage Ordering createdDateDesc ) { - giant.lock(); - - try { - List list = createdDateDesc - .sortedCopy(tasks.values()) - .stream() - .filter(taskStuff -> taskStuff.getStatus().isComplete()) - .limit(n) - .collect(Collectors.toList()); - final ImmutableList.Builder> listBuilder = ImmutableList.builder(); - for (final TaskStuff taskStuff : list) { - String id = taskStuff.getTask().getId(); - TaskInfo t = new TaskInfo<>( - id, - taskStuff.getCreatedDate(), - taskStuff.getStatus(), - taskStuff.getDataSource(), - taskStuff.getTask() - ); - listBuilder.add(t); - } - return listBuilder.build(); - } - finally { - giant.unlock(); - } + List> list = tasks.values() + .stream() + .filter(taskStuff -> taskStuff.getStatus().isComplete()) + .sorted(createdDateDesc) + .limit(n) + .map(TaskStuff::toTaskInfo) + .collect(Collectors.toList()); + return Collections.unmodifiableList(list); } @Override public void addLock(final String taskid, final TaskLock taskLock) { - giant.lock(); - - try { - Preconditions.checkNotNull(taskid, "taskid"); - Preconditions.checkNotNull(taskLock, "taskLock"); + Preconditions.checkNotNull(taskid, "taskid"); + Preconditions.checkNotNull(taskLock, "taskLock"); + synchronized (taskLocks) { taskLocks.put(taskid, taskLock); } - finally { - giant.unlock(); - } } @Override public void replaceLock(String taskid, TaskLock oldLock, TaskLock newLock) { - giant.lock(); - - try { - Preconditions.checkNotNull(taskid, "taskid"); - Preconditions.checkNotNull(oldLock, "oldLock"); - Preconditions.checkNotNull(newLock, "newLock"); + Preconditions.checkNotNull(taskid, "taskid"); + Preconditions.checkNotNull(oldLock, "oldLock"); + Preconditions.checkNotNull(newLock, "newLock"); + synchronized (taskLocks) { if (!taskLocks.remove(taskid, oldLock)) { log.warn("taskLock[%s] for replacement is not found for task[%s]", oldLock, taskid); } taskLocks.put(taskid, newLock); } - finally { - giant.unlock(); - } } @Override public void removeLock(final String taskid, final TaskLock taskLock) { - giant.lock(); - - try { - Preconditions.checkNotNull(taskLock, "taskLock"); + Preconditions.checkNotNull(taskLock, "taskLock"); + synchronized (taskLocks) { taskLocks.remove(taskid, taskLock); } - finally { - giant.unlock(); - } - } - - @Override - public void removeTasksOlderThan(final long timestamp) - { - giant.lock(); - - try { - List taskIds = tasks.entrySet().stream() - .filter(entry -> entry.getValue().getStatus().isComplete() - && entry.getValue().getCreatedDate().isBefore(timestamp)) - .map(entry -> entry.getKey()) - .collect(Collectors.toList()); - - taskIds.forEach(taskActions::removeAll); - taskIds.forEach(tasks::remove); - } - finally { - giant.unlock(); - } } @Override public List getLocks(final String taskid) { - giant.lock(); - - try { + synchronized (taskLocks) { return ImmutableList.copyOf(taskLocks.get(taskid)); } - finally { - giant.unlock(); + } + + @Override + public void removeTasksOlderThan(final long timestamp) + { + // This is the only fn where both tasks & taskActions are modified for removal, they may + // be added elsewhere. + + // It is possible that multiple calls here occur to removeTasksOlderThan() concurrently. + // It is then possible that the same task will be queued for removal twice. Whilst not ideal, + // it will not cause any problems. + List taskIds = tasks.entrySet().stream() + .filter(entry -> entry.getValue().getStatus().isComplete() + && entry.getValue().getCreatedDate().isBefore(timestamp)) + .map(entry -> entry.getKey()) + .collect(Collectors.toList()); + + taskIds.forEach(tasks::remove); + synchronized (taskActions) { + taskIds.forEach(taskActions::removeAll); } } @@ -423,28 +301,18 @@ public class HeapMemoryTaskStorage implements TaskStorage @Override public void addAuditLog(Task task, TaskAction taskAction) { - giant.lock(); - - try { + synchronized (taskActions) { taskActions.put(task.getId(), taskAction); } - finally { - giant.unlock(); - } } @Deprecated @Override public List getAuditLogs(String taskid) { - giant.lock(); - - try { + synchronized (taskActions) { return ImmutableList.copyOf(taskActions.get(taskid)); } - finally { - giant.unlock(); - } } private static class TaskStuff @@ -488,5 +356,16 @@ public class HeapMemoryTaskStorage implements TaskStorage { return new TaskStuff(task, _status, createdDate, dataSource); } + + static TaskInfo toTaskInfo(TaskStuff taskStuff) + { + return new TaskInfo<>( + taskStuff.getTask().getId(), + taskStuff.getCreatedDate(), + taskStuff.getStatus(), + taskStuff.getDataSource(), + taskStuff.getTask() + ); + } } }