From 3335040b2206588f4034bc8c6004d67c9ce75773 Mon Sep 17 00:00:00 2001 From: YongGang Date: Thu, 3 Aug 2023 20:37:11 -0700 Subject: [PATCH] Report task/pending/time metrics for k8s based ingestion (#14698) Changes: * Add and invoke `StateListener` when state changes in `KubernetesPeonLifecycle` * Report `task/pending/time` metric in `KubernetesTaskRunner` when state moves to RUNNING --- .../k8s/overlord/KubernetesPeonLifecycle.java | 54 ++-- .../KubernetesPeonLifecycleFactory.java | 5 +- .../k8s/overlord/KubernetesTaskRunner.java | 42 ++- .../overlord/KubernetesTaskRunnerFactory.java | 9 +- .../k8s/overlord/KubernetesWorkItem.java | 5 + .../k8s/overlord/PeonLifecycleFactory.java | 2 +- .../overlord/KubernetesPeonLifecycleTest.java | 257 ++++++++++++++++-- .../KubernetesTaskRunnerFactoryTest.java | 27 +- .../overlord/KubernetesTaskRunnerTest.java | 34 ++- .../k8s/overlord/KubernetesWorkItemTest.java | 14 + .../overlord/TestPeonLifecycleFactory.java | 2 +- 11 files changed, 384 insertions(+), 67 deletions(-) diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycle.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycle.java index b0d483e5278..302a568a230 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycle.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycle.java @@ -45,6 +45,7 @@ import java.io.InputStream; import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Path; +import java.util.Arrays; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; @@ -60,6 +61,12 @@ import java.util.concurrent.atomic.AtomicReference; */ public class KubernetesPeonLifecycle { + @FunctionalInterface + public interface TaskStateListener + { + void stateChanged(State state, String taskId); + } + private static final EmittingLogger log = new EmittingLogger(KubernetesPeonLifecycle.class); protected enum State @@ -79,6 +86,7 @@ public class KubernetesPeonLifecycle private final TaskLogs taskLogs; private final KubernetesPeonClient kubernetesClient; private final ObjectMapper mapper; + private final TaskStateListener stateListener; @MonotonicNonNull private LogWatch logWatch; @@ -89,13 +97,15 @@ public class KubernetesPeonLifecycle Task task, KubernetesPeonClient kubernetesClient, TaskLogs taskLogs, - ObjectMapper mapper + ObjectMapper mapper, + TaskStateListener stateListener ) { this.taskId = new K8sTaskId(task); this.kubernetesClient = kubernetesClient; this.taskLogs = taskLogs; this.mapper = mapper; + this.stateListener = stateListener; } /** @@ -110,13 +120,7 @@ public class KubernetesPeonLifecycle protected synchronized TaskStatus run(Job job, long launchTimeout, long timeout) throws IllegalStateException { try { - Preconditions.checkState( - state.compareAndSet(State.NOT_STARTED, State.PENDING), - "Task [%s] failed to run: invalid peon lifecycle state transition [%s]->[%s]", - taskId.getOriginalTaskId(), - state.get(), - State.PENDING - ); + updateState(new State[]{State.NOT_STARTED}, State.PENDING); // In case something bad happens and run is called twice on this KubernetesPeonLifecycle, reset taskLocation. taskLocation = null; @@ -134,7 +138,7 @@ public class KubernetesPeonLifecycle throw e; } finally { - state.set(State.STOPPED); + stopTask(); } } @@ -148,16 +152,7 @@ public class KubernetesPeonLifecycle protected synchronized TaskStatus join(long timeout) throws IllegalStateException { try { - Preconditions.checkState( - ( - state.compareAndSet(State.NOT_STARTED, State.RUNNING) || - state.compareAndSet(State.PENDING, State.RUNNING) - ), - "Task [%s] failed to join: invalid peon lifecycle state transition [%s]->[%s]", - taskId.getOriginalTaskId(), - state.get(), - State.RUNNING - ); + updateState(new State[]{State.NOT_STARTED, State.PENDING}, State.RUNNING); JobResponse jobResponse = kubernetesClient.waitForPeonJobCompletion( taskId, @@ -176,7 +171,7 @@ public class KubernetesPeonLifecycle log.warn(e, "Task [%s] cleanup failed", taskId); } - state.set(State.STOPPED); + stopTask(); } } @@ -326,4 +321,23 @@ public class KubernetesPeonLifecycle log.warn(e, "Failed to manage temporary log file for task [%s]", taskId.getOriginalTaskId()); } } + + private void stopTask() + { + if (!State.STOPPED.equals(state.get())) { + updateState(new State[]{State.NOT_STARTED, State.PENDING, State.RUNNING}, State.STOPPED); + } + } + + private void updateState(State[] acceptedStates, State targetState) + { + Preconditions.checkState( + Arrays.stream(acceptedStates).anyMatch(s -> state.compareAndSet(s, targetState)), + "Task [%s] failed to run: invalid peon lifecycle state transition [%s]->[%s]", + taskId.getOriginalTaskId(), + state.get(), + targetState + ); + stateListener.stateChanged(state.get(), taskId.getOriginalTaskId()); + } } diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycleFactory.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycleFactory.java index 2f2375789be..bf4e3a71257 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycleFactory.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycleFactory.java @@ -42,13 +42,14 @@ public class KubernetesPeonLifecycleFactory implements PeonLifecycleFactory } @Override - public KubernetesPeonLifecycle build(Task task) + public KubernetesPeonLifecycle build(Task task, KubernetesPeonLifecycle.TaskStateListener stateListener) { return new KubernetesPeonLifecycle( task, client, taskLogs, - mapper + mapper, + stateListener ); } } diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java index 12c5bb60297..89d33c7404b 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java @@ -31,16 +31,20 @@ import io.fabric8.kubernetes.api.model.batch.v1.Job; import org.apache.druid.indexer.RunnerTaskState; import org.apache.druid.indexer.TaskLocation; import org.apache.druid.indexer.TaskStatus; +import org.apache.druid.indexing.common.task.IndexTaskUtils; import org.apache.druid.indexing.common.task.Task; import org.apache.druid.indexing.overlord.TaskRunner; import org.apache.druid.indexing.overlord.TaskRunnerListener; import org.apache.druid.indexing.overlord.TaskRunnerUtils; import org.apache.druid.indexing.overlord.TaskRunnerWorkItem; import org.apache.druid.indexing.overlord.autoscaling.ScalingStats; +import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.emitter.EmittingLogger; +import org.apache.druid.java.util.emitter.service.ServiceEmitter; +import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; import org.apache.druid.java.util.http.client.HttpClient; import org.apache.druid.java.util.http.client.Request; import org.apache.druid.java.util.http.client.response.InputStreamResponseHandler; @@ -48,6 +52,7 @@ import org.apache.druid.k8s.overlord.common.KubernetesPeonClient; import org.apache.druid.k8s.overlord.taskadapter.TaskAdapter; import org.apache.druid.tasklogs.TaskLogStreamer; import org.jboss.netty.handler.codec.http.HttpMethod; +import org.joda.time.Duration; import javax.annotation.Nullable; import java.io.IOException; @@ -100,6 +105,7 @@ public class KubernetesTaskRunner implements TaskLogStreamer, TaskRunner private final ListeningExecutorService exec; private final HttpClient httpClient; private final PeonLifecycleFactory peonLifecycleFactory; + private final ServiceEmitter emitter; public KubernetesTaskRunner( @@ -107,7 +113,8 @@ public class KubernetesTaskRunner implements TaskLogStreamer, TaskRunner KubernetesTaskRunnerConfig config, KubernetesPeonClient client, HttpClient httpClient, - PeonLifecycleFactory peonLifecycleFactory + PeonLifecycleFactory peonLifecycleFactory, + ServiceEmitter emitter ) { this.adapter = adapter; @@ -119,6 +126,7 @@ public class KubernetesTaskRunner implements TaskLogStreamer, TaskRunner this.exec = MoreExecutors.listeningDecorator( Execs.multiThreaded(config.getCapacity(), "k8s-task-runner-%d") ); + this.emitter = emitter; } @Override @@ -162,7 +170,10 @@ public class KubernetesTaskRunner implements TaskLogStreamer, TaskRunner protected TaskStatus doTask(Task task, boolean run) { try { - KubernetesPeonLifecycle peonLifecycle = peonLifecycleFactory.build(task); + KubernetesPeonLifecycle peonLifecycle = peonLifecycleFactory.build( + task, + this::emitTaskStateMetrics + ); synchronized (tasks) { KubernetesWorkItem workItem = tasks.get(task.getId()); @@ -206,6 +217,33 @@ public class KubernetesTaskRunner implements TaskLogStreamer, TaskRunner } } + @VisibleForTesting + protected void emitTaskStateMetrics(KubernetesPeonLifecycle.State state, String taskId) + { + switch (state) { + case RUNNING: + KubernetesWorkItem workItem; + synchronized (tasks) { + workItem = tasks.get(taskId); + if (workItem == null) { + log.error("Task [%s] disappeared", taskId); + return; + } + } + ServiceMetricEvent.Builder metricBuilder = new ServiceMetricEvent.Builder(); + IndexTaskUtils.setTaskDimensions(metricBuilder, workItem.getTask()); + emitter.emit( + metricBuilder.build( + "task/pending/time", + new Duration(workItem.getCreatedTime(), DateTimes.nowUtc()).getMillis() + ) + ); + default: + // ignore other state transition now + return; + } + } + @Override public void updateStatus(Task task, TaskStatus status) { diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerFactory.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerFactory.java index 5b63872cab7..76698ba8fe3 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerFactory.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerFactory.java @@ -28,6 +28,7 @@ import org.apache.druid.guice.annotations.Smile; import org.apache.druid.indexing.common.config.TaskConfig; import org.apache.druid.indexing.overlord.TaskRunnerFactory; import org.apache.druid.java.util.common.IAE; +import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.apache.druid.java.util.http.client.HttpClient; import org.apache.druid.k8s.overlord.common.DruidKubernetesClient; import org.apache.druid.k8s.overlord.common.KubernetesPeonClient; @@ -54,6 +55,7 @@ public class KubernetesTaskRunnerFactory implements TaskRunnerFactory joinAsync(Task task) @@ -325,7 +330,8 @@ public class KubernetesTaskRunnerTest extends EasyMockSupport config, peonClient, httpClient, - new TestPeonLifecycleFactory(kubernetesPeonLifecycle) + new TestPeonLifecycleFactory(kubernetesPeonLifecycle), + emitter ) { @Override protected ListenableFuture joinAsync(Task task) @@ -599,4 +605,26 @@ public class KubernetesTaskRunnerTest extends EasyMockSupport verifyAll(); } + + @Test + public void test_metricsReported_whenTaskStateChange() + { + KubernetesWorkItem workItem = new KubernetesWorkItem(task, null) { + @Override + public TaskLocation getLocation() + { + return TaskLocation.unknown(); + } + }; + + runner.tasks.put(task.getId(), workItem); + + emitter.emit(EasyMock.anyObject(ServiceEventBuilder.class)); + + replayAll(); + + runner.emitTaskStateMetrics(KubernetesPeonLifecycle.State.RUNNING, task.getId()); + + verifyAll(); + } } diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesWorkItemTest.java b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesWorkItemTest.java index b272230b079..5f951770480 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesWorkItemTest.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesWorkItemTest.java @@ -55,6 +55,7 @@ public class KubernetesWorkItemTest extends EasyMockSupport task, null, null, + null, null )); @@ -64,6 +65,7 @@ public class KubernetesWorkItemTest extends EasyMockSupport task, null, null, + null, null )) ); @@ -157,6 +159,7 @@ public class KubernetesWorkItemTest extends EasyMockSupport task, null, null, + null, null )); @@ -170,6 +173,7 @@ public class KubernetesWorkItemTest extends EasyMockSupport task, null, null, + null, null ) { @Override @@ -191,6 +195,7 @@ public class KubernetesWorkItemTest extends EasyMockSupport task, null, null, + null, null ) { @Override @@ -212,6 +217,7 @@ public class KubernetesWorkItemTest extends EasyMockSupport task, null, null, + null, null ) { @Override @@ -239,6 +245,7 @@ public class KubernetesWorkItemTest extends EasyMockSupport task, null, null, + null, null )); Assert.assertFalse(workItem.streamTaskLogs().isPresent()); @@ -257,6 +264,7 @@ public class KubernetesWorkItemTest extends EasyMockSupport task, null, null, + null, null )); @@ -274,4 +282,10 @@ public class KubernetesWorkItemTest extends EasyMockSupport { Assert.assertEquals(task.getDataSource(), workItem.getDataSource()); } + + @Test + public void test_getTask() + { + Assert.assertEquals(task, workItem.getTask()); + } } diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/TestPeonLifecycleFactory.java b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/TestPeonLifecycleFactory.java index 333b0490bac..8b8c43c0d71 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/TestPeonLifecycleFactory.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/TestPeonLifecycleFactory.java @@ -31,7 +31,7 @@ public class TestPeonLifecycleFactory implements PeonLifecycleFactory } @Override - public KubernetesPeonLifecycle build(Task task) + public KubernetesPeonLifecycle build(Task task, KubernetesPeonLifecycle.TaskStateListener stateListener) { return kubernetesPeonLifecycle; }