From bd07c3dd43428608678e337c7935f237a64f533d Mon Sep 17 00:00:00 2001 From: George Shiqi Wu Date: Sat, 17 Jun 2023 20:30:37 -0400 Subject: [PATCH] Don't need to double synchronize on simple map operations (#14435) * Don't need to double syncronize on simple map operations * remove lock --- .../k8s/overlord/KubernetesTaskRunner.java | 62 ++++++++----------- 1 file changed, 25 insertions(+), 37 deletions(-) diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java index fe2f4be3711..30bea70416b 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java @@ -134,18 +134,16 @@ public class KubernetesTaskRunner implements TaskLogStreamer, TaskRunner @Override public ListenableFuture run(Task task) { - synchronized (tasks) { - tasks.computeIfAbsent(task.getId(), k -> new KubernetesWorkItem(task, exec.submit(() -> runTask(task)))); - return tasks.get(task.getId()).getResult(); - } + return tasks.computeIfAbsent( + task.getId(), k -> new KubernetesWorkItem(task, exec.submit(() -> runTask(task))) + ).getResult(); } protected ListenableFuture joinAsync(Task task) { - synchronized (tasks) { - tasks.computeIfAbsent(task.getId(), k -> new KubernetesWorkItem(task, exec.submit(() -> joinTask(task)))); - return tasks.get(task.getId()).getResult(); - } + return tasks.computeIfAbsent( + task.getId(), k -> new KubernetesWorkItem(task, exec.submit(() -> joinTask(task))) + ).getResult(); } private TaskStatus runTask(Task task) @@ -163,20 +161,18 @@ public class KubernetesTaskRunner implements TaskLogStreamer, TaskRunner { KubernetesPeonLifecycle peonLifecycle = peonLifecycleFactory.build(task); - synchronized (tasks) { - KubernetesWorkItem workItem = tasks.get(task.getId()); + KubernetesWorkItem workItem = tasks.get(task.getId()); - if (workItem == null) { - throw new ISE("Task [%s] disappeared", task.getId()); - } - - if (workItem.isShutdownRequested()) { - throw new ISE("Task [%s] has been shut down", task.getId()); - } - - workItem.setKubernetesPeonLifecycle(peonLifecycle); + if (workItem == null) { + throw new ISE("Task [%s] disappeared", task.getId()); } + if (workItem.isShutdownRequested()) { + throw new ISE("Task [%s] has been shut down", task.getId()); + } + + workItem.setKubernetesPeonLifecycle(peonLifecycle); + try { TaskStatus taskStatus; if (run) { @@ -202,9 +198,7 @@ public class KubernetesTaskRunner implements TaskLogStreamer, TaskRunner } finally { - synchronized (tasks) { - tasks.remove(task.getId()); - } + tasks.remove(task.getId()); } } @@ -322,9 +316,7 @@ public class KubernetesTaskRunner implements TaskLogStreamer, TaskRunner @Override public Collection getKnownTasks() { - synchronized (tasks) { - return Lists.newArrayList(tasks.values()); - } + return Lists.newArrayList(tasks.values()); } @@ -393,23 +385,19 @@ public class KubernetesTaskRunner implements TaskLogStreamer, TaskRunner @Override public Collection getRunningTasks() { - synchronized (tasks) { - return tasks.values() - .stream() - .filter(KubernetesWorkItem::isRunning) - .collect(Collectors.toList()); - } + return tasks.values() + .stream() + .filter(KubernetesWorkItem::isRunning) + .collect(Collectors.toList()); } @Override public Collection getPendingTasks() { - synchronized (tasks) { - return tasks.values() - .stream() - .filter(KubernetesWorkItem::isPending) - .collect(Collectors.toList()); - } + return tasks.values() + .stream() + .filter(KubernetesWorkItem::isPending) + .collect(Collectors.toList()); } @Nullable