mirror of https://github.com/apache/druid.git
perf: improve concurrency and improve perf for task query in HeapMemoryTaskStorage (#11272)
* perf: improve concurrency and reduce algorithmic cost for task querying in HeapMemoryTaskStorage * fix: address intellij linter concern regarding use of ConcurrentMap interface * nit: document thread safety of HeapMemoryTaskStorage * empty to trigger ci
This commit is contained in:
parent
a9c4b478ab
commit
d3220693f3
|
@ -26,6 +26,7 @@ import com.google.common.collect.HashMultimap;
|
|||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.Multimap;
|
||||
import com.google.common.collect.Ordering;
|
||||
import com.google.errorprone.annotations.concurrent.GuardedBy;
|
||||
import com.google.inject.Inject;
|
||||
import org.apache.druid.indexer.TaskInfo;
|
||||
import org.apache.druid.indexer.TaskStatus;
|
||||
|
@ -40,23 +41,25 @@ import org.joda.time.DateTime;
|
|||
import org.joda.time.Duration;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
import java.util.HashMap;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
* Implements an in-heap TaskStorage facility, with no persistence across restarts. This class is not
|
||||
* thread safe.
|
||||
* Implements an in-heap TaskStorage facility, with no persistence across restarts. This class
|
||||
* is thread safe.
|
||||
*/
|
||||
public class HeapMemoryTaskStorage implements TaskStorage
|
||||
{
|
||||
private final TaskStorageConfig config;
|
||||
|
||||
private final ReentrantLock giant = new ReentrantLock();
|
||||
private final Map<String, TaskStuff> tasks = new HashMap<>();
|
||||
private final ConcurrentHashMap<String, TaskStuff> tasks = new ConcurrentHashMap<>();
|
||||
|
||||
@GuardedBy("itself")
|
||||
private final Multimap<String, TaskLock> taskLocks = HashMultimap.create();
|
||||
@GuardedBy("itself")
|
||||
private final Multimap<String, TaskAction> taskActions = ArrayListMultimap.create();
|
||||
|
||||
private static final Logger log = new Logger(HeapMemoryTaskStorage.class);
|
||||
|
@ -70,82 +73,59 @@ public class HeapMemoryTaskStorage implements TaskStorage
|
|||
@Override
|
||||
public void insert(Task task, TaskStatus status) throws EntryExistsException
|
||||
{
|
||||
giant.lock();
|
||||
Preconditions.checkNotNull(task, "task");
|
||||
Preconditions.checkNotNull(status, "status");
|
||||
Preconditions.checkArgument(
|
||||
task.getId().equals(status.getId()),
|
||||
"Task/Status ID mismatch[%s/%s]",
|
||||
task.getId(),
|
||||
status.getId()
|
||||
);
|
||||
|
||||
try {
|
||||
Preconditions.checkNotNull(task, "task");
|
||||
Preconditions.checkNotNull(status, "status");
|
||||
Preconditions.checkArgument(
|
||||
task.getId().equals(status.getId()),
|
||||
"Task/Status ID mismatch[%s/%s]",
|
||||
task.getId(),
|
||||
status.getId()
|
||||
);
|
||||
|
||||
if (tasks.containsKey(task.getId())) {
|
||||
throw new EntryExistsException(task.getId());
|
||||
}
|
||||
|
||||
log.info("Inserting task %s with status: %s", task.getId(), status);
|
||||
tasks.put(task.getId(), new TaskStuff(task, status, DateTimes.nowUtc(), task.getDataSource()));
|
||||
}
|
||||
finally {
|
||||
giant.unlock();
|
||||
TaskStuff newTaskStuff = new TaskStuff(task, status, DateTimes.nowUtc(), task.getDataSource());
|
||||
TaskStuff alreadyExisted = tasks.putIfAbsent(task.getId(), newTaskStuff);
|
||||
if (alreadyExisted != null) {
|
||||
throw new EntryExistsException(task.getId());
|
||||
}
|
||||
|
||||
log.info("Inserted task %s with status: %s", task.getId(), status);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Optional<Task> getTask(String taskid)
|
||||
{
|
||||
giant.lock();
|
||||
|
||||
try {
|
||||
Preconditions.checkNotNull(taskid, "taskid");
|
||||
if (tasks.containsKey(taskid)) {
|
||||
return Optional.of(tasks.get(taskid).getTask());
|
||||
} else {
|
||||
return Optional.absent();
|
||||
}
|
||||
}
|
||||
finally {
|
||||
giant.unlock();
|
||||
Preconditions.checkNotNull(taskid, "taskid");
|
||||
TaskStuff taskStuff = tasks.get(taskid);
|
||||
if (taskStuff != null) {
|
||||
return Optional.of(taskStuff.getTask());
|
||||
} else {
|
||||
return Optional.absent();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setStatus(TaskStatus status)
|
||||
{
|
||||
giant.lock();
|
||||
Preconditions.checkNotNull(status, "status");
|
||||
final String taskid = status.getId();
|
||||
|
||||
try {
|
||||
Preconditions.checkNotNull(status, "status");
|
||||
|
||||
final String taskid = status.getId();
|
||||
Preconditions.checkState(tasks.containsKey(taskid), "Task ID must already be present: %s", taskid);
|
||||
Preconditions.checkState(tasks.get(taskid).getStatus().isRunnable(), "Task status must be runnable: %s", taskid);
|
||||
log.info("Updating task %s to status: %s", taskid, status);
|
||||
tasks.put(taskid, tasks.get(taskid).withStatus(status));
|
||||
}
|
||||
finally {
|
||||
giant.unlock();
|
||||
}
|
||||
log.info("Updating task %s to status: %s", taskid, status);
|
||||
TaskStuff updated = tasks.computeIfPresent(taskid, (tid, taskStuff) -> {
|
||||
Preconditions.checkState(taskStuff.getStatus().isRunnable(), "Task must be runnable: %s", taskid);
|
||||
return taskStuff.withStatus(status);
|
||||
});
|
||||
Preconditions.checkNotNull(updated, "Task ID must already be present: %s", taskid);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Optional<TaskStatus> getStatus(String taskid)
|
||||
{
|
||||
giant.lock();
|
||||
|
||||
try {
|
||||
Preconditions.checkNotNull(taskid, "taskid");
|
||||
if (tasks.containsKey(taskid)) {
|
||||
return Optional.of(tasks.get(taskid).getStatus());
|
||||
} else {
|
||||
return Optional.absent();
|
||||
}
|
||||
}
|
||||
finally {
|
||||
giant.unlock();
|
||||
Preconditions.checkNotNull(taskid, "taskid");
|
||||
TaskStuff existing = tasks.get(taskid);
|
||||
if (existing != null) {
|
||||
return Optional.of(existing.getStatus());
|
||||
} else {
|
||||
return Optional.absent();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -153,90 +133,50 @@ public class HeapMemoryTaskStorage implements TaskStorage
|
|||
@Override
|
||||
public TaskInfo<Task, TaskStatus> getTaskInfo(String taskId)
|
||||
{
|
||||
giant.lock();
|
||||
|
||||
try {
|
||||
Preconditions.checkNotNull(taskId, "taskId");
|
||||
final TaskStuff taskStuff = tasks.get(taskId);
|
||||
if (taskStuff != null) {
|
||||
return new TaskInfo<>(
|
||||
taskStuff.getTask().getId(),
|
||||
taskStuff.getCreatedDate(),
|
||||
taskStuff.getStatus(),
|
||||
taskStuff.getDataSource(),
|
||||
taskStuff.getTask()
|
||||
);
|
||||
} else {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
finally {
|
||||
giant.unlock();
|
||||
Preconditions.checkNotNull(taskId, "taskId");
|
||||
final TaskStuff taskStuff = tasks.get(taskId);
|
||||
if (taskStuff != null) {
|
||||
return TaskStuff.toTaskInfo(taskStuff);
|
||||
} else {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<Task> getActiveTasks()
|
||||
{
|
||||
giant.lock();
|
||||
|
||||
try {
|
||||
final ImmutableList.Builder<Task> listBuilder = ImmutableList.builder();
|
||||
for (final TaskStuff taskStuff : tasks.values()) {
|
||||
if (taskStuff.getStatus().isRunnable()) {
|
||||
listBuilder.add(taskStuff.getTask());
|
||||
}
|
||||
final ImmutableList.Builder<Task> listBuilder = ImmutableList.builder();
|
||||
for (final TaskStuff taskStuff : tasks.values()) {
|
||||
if (taskStuff.getStatus().isRunnable()) {
|
||||
listBuilder.add(taskStuff.getTask());
|
||||
}
|
||||
return listBuilder.build();
|
||||
}
|
||||
finally {
|
||||
giant.unlock();
|
||||
}
|
||||
return listBuilder.build();
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<Task> getActiveTasksByDatasource(String datasource)
|
||||
{
|
||||
giant.lock();
|
||||
|
||||
try {
|
||||
final ImmutableList.Builder<Task> listBuilder = ImmutableList.builder();
|
||||
for (Map.Entry<String, TaskStuff> entry : tasks.entrySet()) {
|
||||
if (entry.getValue().getStatus().isRunnable() && entry.getValue().getDataSource().equals(datasource)) {
|
||||
listBuilder.add(entry.getValue().getTask());
|
||||
}
|
||||
final ImmutableList.Builder<Task> listBuilder = ImmutableList.builder();
|
||||
for (Map.Entry<String, TaskStuff> entry : tasks.entrySet()) {
|
||||
if (entry.getValue().getStatus().isRunnable() && entry.getValue().getDataSource().equals(datasource)) {
|
||||
listBuilder.add(entry.getValue().getTask());
|
||||
}
|
||||
return listBuilder.build();
|
||||
}
|
||||
finally {
|
||||
giant.unlock();
|
||||
}
|
||||
return listBuilder.build();
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<TaskInfo<Task, TaskStatus>> getActiveTaskInfo(@Nullable String dataSource)
|
||||
{
|
||||
giant.lock();
|
||||
|
||||
try {
|
||||
final ImmutableList.Builder<TaskInfo<Task, TaskStatus>> listBuilder = ImmutableList.builder();
|
||||
for (final TaskStuff taskStuff : tasks.values()) {
|
||||
if (taskStuff.getStatus().isRunnable()) {
|
||||
TaskInfo t = new TaskInfo<>(
|
||||
taskStuff.getTask().getId(),
|
||||
taskStuff.getCreatedDate(),
|
||||
taskStuff.getStatus(),
|
||||
taskStuff.getDataSource(),
|
||||
taskStuff.getTask()
|
||||
);
|
||||
listBuilder.add(t);
|
||||
}
|
||||
final ImmutableList.Builder<TaskInfo<Task, TaskStatus>> listBuilder = ImmutableList.builder();
|
||||
for (final TaskStuff taskStuff : tasks.values()) {
|
||||
if (taskStuff.getStatus().isRunnable()) {
|
||||
TaskInfo t = TaskStuff.toTaskInfo(taskStuff);
|
||||
listBuilder.add(t);
|
||||
}
|
||||
return listBuilder.build();
|
||||
}
|
||||
finally {
|
||||
giant.unlock();
|
||||
}
|
||||
return listBuilder.build();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -246,29 +186,22 @@ public class HeapMemoryTaskStorage implements TaskStorage
|
|||
@Nullable String datasource
|
||||
)
|
||||
{
|
||||
giant.lock();
|
||||
|
||||
try {
|
||||
final Ordering<TaskStuff> createdDateDesc = new Ordering<TaskStuff>()
|
||||
final Ordering<TaskStuff> createdDateDesc = new Ordering<TaskStuff>()
|
||||
{
|
||||
@Override
|
||||
public int compare(TaskStuff a, TaskStuff b)
|
||||
{
|
||||
@Override
|
||||
public int compare(TaskStuff a, TaskStuff b)
|
||||
{
|
||||
return a.getCreatedDate().compareTo(b.getCreatedDate());
|
||||
}
|
||||
}.reverse();
|
||||
return a.getCreatedDate().compareTo(b.getCreatedDate());
|
||||
}
|
||||
}.reverse();
|
||||
|
||||
return maxTaskStatuses == null ?
|
||||
getRecentlyCreatedAlreadyFinishedTaskInfoSince(
|
||||
DateTimes.nowUtc()
|
||||
.minus(durationBeforeNow == null ? config.getRecentlyFinishedThreshold() : durationBeforeNow),
|
||||
createdDateDesc
|
||||
) :
|
||||
getNRecentlyCreatedAlreadyFinishedTaskInfo(maxTaskStatuses, createdDateDesc);
|
||||
}
|
||||
finally {
|
||||
giant.unlock();
|
||||
}
|
||||
return maxTaskStatuses == null ?
|
||||
getRecentlyCreatedAlreadyFinishedTaskInfoSince(
|
||||
DateTimes.nowUtc()
|
||||
.minus(durationBeforeNow == null ? config.getRecentlyFinishedThreshold() : durationBeforeNow),
|
||||
createdDateDesc
|
||||
) :
|
||||
getNRecentlyCreatedAlreadyFinishedTaskInfo(maxTaskStatuses, createdDateDesc);
|
||||
}
|
||||
|
||||
private List<TaskInfo<Task, TaskStatus>> getRecentlyCreatedAlreadyFinishedTaskInfoSince(
|
||||
|
@ -276,31 +209,13 @@ public class HeapMemoryTaskStorage implements TaskStorage
|
|||
Ordering<TaskStuff> createdDateDesc
|
||||
)
|
||||
{
|
||||
giant.lock();
|
||||
|
||||
try {
|
||||
List<TaskStuff> list = createdDateDesc
|
||||
.sortedCopy(tasks.values())
|
||||
.stream()
|
||||
.filter(taskStuff -> taskStuff.getStatus().isComplete() && taskStuff.createdDate.isAfter(start))
|
||||
.collect(Collectors.toList());
|
||||
final ImmutableList.Builder<TaskInfo<Task, TaskStatus>> listBuilder = ImmutableList.builder();
|
||||
for (final TaskStuff taskStuff : list) {
|
||||
String id = taskStuff.getTask().getId();
|
||||
TaskInfo t = new TaskInfo<>(
|
||||
id,
|
||||
taskStuff.getCreatedDate(),
|
||||
taskStuff.getStatus(),
|
||||
taskStuff.getDataSource(),
|
||||
taskStuff.getTask()
|
||||
);
|
||||
listBuilder.add(t);
|
||||
}
|
||||
return listBuilder.build();
|
||||
}
|
||||
finally {
|
||||
giant.unlock();
|
||||
}
|
||||
List<TaskInfo<Task, TaskStatus>> list = tasks.values()
|
||||
.stream()
|
||||
.filter(taskStuff -> taskStuff.getStatus().isComplete() && taskStuff.getCreatedDate().isAfter(start))
|
||||
.sorted(createdDateDesc)
|
||||
.map(TaskStuff::toTaskInfo)
|
||||
.collect(Collectors.toList());
|
||||
return Collections.unmodifiableList(list);
|
||||
}
|
||||
|
||||
private List<TaskInfo<Task, TaskStatus>> getNRecentlyCreatedAlreadyFinishedTaskInfo(
|
||||
|
@ -308,114 +223,77 @@ public class HeapMemoryTaskStorage implements TaskStorage
|
|||
Ordering<TaskStuff> createdDateDesc
|
||||
)
|
||||
{
|
||||
giant.lock();
|
||||
|
||||
try {
|
||||
List<TaskStuff> list = createdDateDesc
|
||||
.sortedCopy(tasks.values())
|
||||
.stream()
|
||||
.filter(taskStuff -> taskStuff.getStatus().isComplete())
|
||||
.limit(n)
|
||||
.collect(Collectors.toList());
|
||||
final ImmutableList.Builder<TaskInfo<Task, TaskStatus>> listBuilder = ImmutableList.builder();
|
||||
for (final TaskStuff taskStuff : list) {
|
||||
String id = taskStuff.getTask().getId();
|
||||
TaskInfo t = new TaskInfo<>(
|
||||
id,
|
||||
taskStuff.getCreatedDate(),
|
||||
taskStuff.getStatus(),
|
||||
taskStuff.getDataSource(),
|
||||
taskStuff.getTask()
|
||||
);
|
||||
listBuilder.add(t);
|
||||
}
|
||||
return listBuilder.build();
|
||||
}
|
||||
finally {
|
||||
giant.unlock();
|
||||
}
|
||||
List<TaskInfo<Task, TaskStatus>> list = tasks.values()
|
||||
.stream()
|
||||
.filter(taskStuff -> taskStuff.getStatus().isComplete())
|
||||
.sorted(createdDateDesc)
|
||||
.limit(n)
|
||||
.map(TaskStuff::toTaskInfo)
|
||||
.collect(Collectors.toList());
|
||||
return Collections.unmodifiableList(list);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void addLock(final String taskid, final TaskLock taskLock)
|
||||
{
|
||||
giant.lock();
|
||||
|
||||
try {
|
||||
Preconditions.checkNotNull(taskid, "taskid");
|
||||
Preconditions.checkNotNull(taskLock, "taskLock");
|
||||
Preconditions.checkNotNull(taskid, "taskid");
|
||||
Preconditions.checkNotNull(taskLock, "taskLock");
|
||||
synchronized (taskLocks) {
|
||||
taskLocks.put(taskid, taskLock);
|
||||
}
|
||||
finally {
|
||||
giant.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void replaceLock(String taskid, TaskLock oldLock, TaskLock newLock)
|
||||
{
|
||||
giant.lock();
|
||||
|
||||
try {
|
||||
Preconditions.checkNotNull(taskid, "taskid");
|
||||
Preconditions.checkNotNull(oldLock, "oldLock");
|
||||
Preconditions.checkNotNull(newLock, "newLock");
|
||||
Preconditions.checkNotNull(taskid, "taskid");
|
||||
Preconditions.checkNotNull(oldLock, "oldLock");
|
||||
Preconditions.checkNotNull(newLock, "newLock");
|
||||
|
||||
synchronized (taskLocks) {
|
||||
if (!taskLocks.remove(taskid, oldLock)) {
|
||||
log.warn("taskLock[%s] for replacement is not found for task[%s]", oldLock, taskid);
|
||||
}
|
||||
|
||||
taskLocks.put(taskid, newLock);
|
||||
}
|
||||
finally {
|
||||
giant.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void removeLock(final String taskid, final TaskLock taskLock)
|
||||
{
|
||||
giant.lock();
|
||||
|
||||
try {
|
||||
Preconditions.checkNotNull(taskLock, "taskLock");
|
||||
Preconditions.checkNotNull(taskLock, "taskLock");
|
||||
synchronized (taskLocks) {
|
||||
taskLocks.remove(taskid, taskLock);
|
||||
}
|
||||
finally {
|
||||
giant.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void removeTasksOlderThan(final long timestamp)
|
||||
{
|
||||
giant.lock();
|
||||
|
||||
try {
|
||||
List<String> taskIds = tasks.entrySet().stream()
|
||||
.filter(entry -> entry.getValue().getStatus().isComplete()
|
||||
&& entry.getValue().getCreatedDate().isBefore(timestamp))
|
||||
.map(entry -> entry.getKey())
|
||||
.collect(Collectors.toList());
|
||||
|
||||
taskIds.forEach(taskActions::removeAll);
|
||||
taskIds.forEach(tasks::remove);
|
||||
}
|
||||
finally {
|
||||
giant.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<TaskLock> getLocks(final String taskid)
|
||||
{
|
||||
giant.lock();
|
||||
|
||||
try {
|
||||
synchronized (taskLocks) {
|
||||
return ImmutableList.copyOf(taskLocks.get(taskid));
|
||||
}
|
||||
finally {
|
||||
giant.unlock();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void removeTasksOlderThan(final long timestamp)
|
||||
{
|
||||
// This is the only fn where both tasks & taskActions are modified for removal, they may
|
||||
// be added elsewhere.
|
||||
|
||||
// It is possible that multiple calls here occur to removeTasksOlderThan() concurrently.
|
||||
// It is then possible that the same task will be queued for removal twice. Whilst not ideal,
|
||||
// it will not cause any problems.
|
||||
List<String> taskIds = tasks.entrySet().stream()
|
||||
.filter(entry -> entry.getValue().getStatus().isComplete()
|
||||
&& entry.getValue().getCreatedDate().isBefore(timestamp))
|
||||
.map(entry -> entry.getKey())
|
||||
.collect(Collectors.toList());
|
||||
|
||||
taskIds.forEach(tasks::remove);
|
||||
synchronized (taskActions) {
|
||||
taskIds.forEach(taskActions::removeAll);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -423,28 +301,18 @@ public class HeapMemoryTaskStorage implements TaskStorage
|
|||
@Override
|
||||
public <T> void addAuditLog(Task task, TaskAction<T> taskAction)
|
||||
{
|
||||
giant.lock();
|
||||
|
||||
try {
|
||||
synchronized (taskActions) {
|
||||
taskActions.put(task.getId(), taskAction);
|
||||
}
|
||||
finally {
|
||||
giant.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
@Deprecated
|
||||
@Override
|
||||
public List<TaskAction> getAuditLogs(String taskid)
|
||||
{
|
||||
giant.lock();
|
||||
|
||||
try {
|
||||
synchronized (taskActions) {
|
||||
return ImmutableList.copyOf(taskActions.get(taskid));
|
||||
}
|
||||
finally {
|
||||
giant.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
private static class TaskStuff
|
||||
|
@ -488,5 +356,16 @@ public class HeapMemoryTaskStorage implements TaskStorage
|
|||
{
|
||||
return new TaskStuff(task, _status, createdDate, dataSource);
|
||||
}
|
||||
|
||||
static TaskInfo<Task, TaskStatus> toTaskInfo(TaskStuff taskStuff)
|
||||
{
|
||||
return new TaskInfo<>(
|
||||
taskStuff.getTask().getId(),
|
||||
taskStuff.getCreatedDate(),
|
||||
taskStuff.getStatus(),
|
||||
taskStuff.getDataSource(),
|
||||
taskStuff.getTask()
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue