mirror of https://github.com/apache/druid.git
Don't need to double synchronize on simple map operations (#14435)
* Don't need to double syncronize on simple map operations * remove lock
This commit is contained in:
parent
04fb75719e
commit
bd07c3dd43
|
@ -134,18 +134,16 @@ public class KubernetesTaskRunner implements TaskLogStreamer, TaskRunner
|
|||
@Override
|
||||
public ListenableFuture<TaskStatus> run(Task task)
|
||||
{
|
||||
synchronized (tasks) {
|
||||
tasks.computeIfAbsent(task.getId(), k -> new KubernetesWorkItem(task, exec.submit(() -> runTask(task))));
|
||||
return tasks.get(task.getId()).getResult();
|
||||
}
|
||||
return tasks.computeIfAbsent(
|
||||
task.getId(), k -> new KubernetesWorkItem(task, exec.submit(() -> runTask(task)))
|
||||
).getResult();
|
||||
}
|
||||
|
||||
protected ListenableFuture<TaskStatus> joinAsync(Task task)
|
||||
{
|
||||
synchronized (tasks) {
|
||||
tasks.computeIfAbsent(task.getId(), k -> new KubernetesWorkItem(task, exec.submit(() -> joinTask(task))));
|
||||
return tasks.get(task.getId()).getResult();
|
||||
}
|
||||
return tasks.computeIfAbsent(
|
||||
task.getId(), k -> new KubernetesWorkItem(task, exec.submit(() -> joinTask(task)))
|
||||
).getResult();
|
||||
}
|
||||
|
||||
private TaskStatus runTask(Task task)
|
||||
|
@ -163,20 +161,18 @@ public class KubernetesTaskRunner implements TaskLogStreamer, TaskRunner
|
|||
{
|
||||
KubernetesPeonLifecycle peonLifecycle = peonLifecycleFactory.build(task);
|
||||
|
||||
synchronized (tasks) {
|
||||
KubernetesWorkItem workItem = tasks.get(task.getId());
|
||||
KubernetesWorkItem workItem = tasks.get(task.getId());
|
||||
|
||||
if (workItem == null) {
|
||||
throw new ISE("Task [%s] disappeared", task.getId());
|
||||
}
|
||||
|
||||
if (workItem.isShutdownRequested()) {
|
||||
throw new ISE("Task [%s] has been shut down", task.getId());
|
||||
}
|
||||
|
||||
workItem.setKubernetesPeonLifecycle(peonLifecycle);
|
||||
if (workItem == null) {
|
||||
throw new ISE("Task [%s] disappeared", task.getId());
|
||||
}
|
||||
|
||||
if (workItem.isShutdownRequested()) {
|
||||
throw new ISE("Task [%s] has been shut down", task.getId());
|
||||
}
|
||||
|
||||
workItem.setKubernetesPeonLifecycle(peonLifecycle);
|
||||
|
||||
try {
|
||||
TaskStatus taskStatus;
|
||||
if (run) {
|
||||
|
@ -202,9 +198,7 @@ public class KubernetesTaskRunner implements TaskLogStreamer, TaskRunner
|
|||
}
|
||||
|
||||
finally {
|
||||
synchronized (tasks) {
|
||||
tasks.remove(task.getId());
|
||||
}
|
||||
tasks.remove(task.getId());
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -322,9 +316,7 @@ public class KubernetesTaskRunner implements TaskLogStreamer, TaskRunner
|
|||
@Override
|
||||
public Collection<? extends TaskRunnerWorkItem> getKnownTasks()
|
||||
{
|
||||
synchronized (tasks) {
|
||||
return Lists.newArrayList(tasks.values());
|
||||
}
|
||||
return Lists.newArrayList(tasks.values());
|
||||
}
|
||||
|
||||
|
||||
|
@ -393,23 +385,19 @@ public class KubernetesTaskRunner implements TaskLogStreamer, TaskRunner
|
|||
@Override
|
||||
public Collection<TaskRunnerWorkItem> getRunningTasks()
|
||||
{
|
||||
synchronized (tasks) {
|
||||
return tasks.values()
|
||||
.stream()
|
||||
.filter(KubernetesWorkItem::isRunning)
|
||||
.collect(Collectors.toList());
|
||||
}
|
||||
return tasks.values()
|
||||
.stream()
|
||||
.filter(KubernetesWorkItem::isRunning)
|
||||
.collect(Collectors.toList());
|
||||
}
|
||||
|
||||
@Override
|
||||
public Collection<TaskRunnerWorkItem> getPendingTasks()
|
||||
{
|
||||
synchronized (tasks) {
|
||||
return tasks.values()
|
||||
.stream()
|
||||
.filter(KubernetesWorkItem::isPending)
|
||||
.collect(Collectors.toList());
|
||||
}
|
||||
return tasks.values()
|
||||
.stream()
|
||||
.filter(KubernetesWorkItem::isPending)
|
||||
.collect(Collectors.toList());
|
||||
}
|
||||
|
||||
@Nullable
|
||||
|
|
Loading…
Reference in New Issue