Report task/pending/time metrics for k8s based ingestion (#14698)

Changes:
* Add and invoke `StateListener` when state changes in `KubernetesPeonLifecycle`
* Report `task/pending/time` metric in `KubernetesTaskRunner` when state moves to RUNNING
This commit is contained in:
YongGang 2023-08-03 20:37:11 -07:00 committed by GitHub
parent ba957a9b97
commit 3335040b22
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 384 additions and 67 deletions

View File

@ -45,6 +45,7 @@ import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
@ -60,6 +61,12 @@ import java.util.concurrent.atomic.AtomicReference;
*/
public class KubernetesPeonLifecycle
{
@FunctionalInterface
public interface TaskStateListener
{
void stateChanged(State state, String taskId);
}
private static final EmittingLogger log = new EmittingLogger(KubernetesPeonLifecycle.class);
protected enum State
@ -79,6 +86,7 @@ public class KubernetesPeonLifecycle
private final TaskLogs taskLogs;
private final KubernetesPeonClient kubernetesClient;
private final ObjectMapper mapper;
private final TaskStateListener stateListener;
@MonotonicNonNull
private LogWatch logWatch;
@ -89,13 +97,15 @@ public class KubernetesPeonLifecycle
Task task,
KubernetesPeonClient kubernetesClient,
TaskLogs taskLogs,
ObjectMapper mapper
ObjectMapper mapper,
TaskStateListener stateListener
)
{
this.taskId = new K8sTaskId(task);
this.kubernetesClient = kubernetesClient;
this.taskLogs = taskLogs;
this.mapper = mapper;
this.stateListener = stateListener;
}
/**
@ -110,13 +120,7 @@ public class KubernetesPeonLifecycle
protected synchronized TaskStatus run(Job job, long launchTimeout, long timeout) throws IllegalStateException
{
try {
Preconditions.checkState(
state.compareAndSet(State.NOT_STARTED, State.PENDING),
"Task [%s] failed to run: invalid peon lifecycle state transition [%s]->[%s]",
taskId.getOriginalTaskId(),
state.get(),
State.PENDING
);
updateState(new State[]{State.NOT_STARTED}, State.PENDING);
// In case something bad happens and run is called twice on this KubernetesPeonLifecycle, reset taskLocation.
taskLocation = null;
@ -134,7 +138,7 @@ public class KubernetesPeonLifecycle
throw e;
}
finally {
state.set(State.STOPPED);
stopTask();
}
}
@ -148,16 +152,7 @@ public class KubernetesPeonLifecycle
protected synchronized TaskStatus join(long timeout) throws IllegalStateException
{
try {
Preconditions.checkState(
(
state.compareAndSet(State.NOT_STARTED, State.RUNNING) ||
state.compareAndSet(State.PENDING, State.RUNNING)
),
"Task [%s] failed to join: invalid peon lifecycle state transition [%s]->[%s]",
taskId.getOriginalTaskId(),
state.get(),
State.RUNNING
);
updateState(new State[]{State.NOT_STARTED, State.PENDING}, State.RUNNING);
JobResponse jobResponse = kubernetesClient.waitForPeonJobCompletion(
taskId,
@ -176,7 +171,7 @@ public class KubernetesPeonLifecycle
log.warn(e, "Task [%s] cleanup failed", taskId);
}
state.set(State.STOPPED);
stopTask();
}
}
@ -326,4 +321,23 @@ public class KubernetesPeonLifecycle
log.warn(e, "Failed to manage temporary log file for task [%s]", taskId.getOriginalTaskId());
}
}
private void stopTask()
{
if (!State.STOPPED.equals(state.get())) {
updateState(new State[]{State.NOT_STARTED, State.PENDING, State.RUNNING}, State.STOPPED);
}
}
private void updateState(State[] acceptedStates, State targetState)
{
Preconditions.checkState(
Arrays.stream(acceptedStates).anyMatch(s -> state.compareAndSet(s, targetState)),
"Task [%s] failed to run: invalid peon lifecycle state transition [%s]->[%s]",
taskId.getOriginalTaskId(),
state.get(),
targetState
);
stateListener.stateChanged(state.get(), taskId.getOriginalTaskId());
}
}

View File

@ -42,13 +42,14 @@ public class KubernetesPeonLifecycleFactory implements PeonLifecycleFactory
}
@Override
public KubernetesPeonLifecycle build(Task task)
public KubernetesPeonLifecycle build(Task task, KubernetesPeonLifecycle.TaskStateListener stateListener)
{
return new KubernetesPeonLifecycle(
task,
client,
taskLogs,
mapper
mapper,
stateListener
);
}
}

View File

@ -31,16 +31,20 @@ import io.fabric8.kubernetes.api.model.batch.v1.Job;
import org.apache.druid.indexer.RunnerTaskState;
import org.apache.druid.indexer.TaskLocation;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.task.IndexTaskUtils;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.overlord.TaskRunner;
import org.apache.druid.indexing.overlord.TaskRunnerListener;
import org.apache.druid.indexing.overlord.TaskRunnerUtils;
import org.apache.druid.indexing.overlord.TaskRunnerWorkItem;
import org.apache.druid.indexing.overlord.autoscaling.ScalingStats;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.java.util.http.client.HttpClient;
import org.apache.druid.java.util.http.client.Request;
import org.apache.druid.java.util.http.client.response.InputStreamResponseHandler;
@ -48,6 +52,7 @@ import org.apache.druid.k8s.overlord.common.KubernetesPeonClient;
import org.apache.druid.k8s.overlord.taskadapter.TaskAdapter;
import org.apache.druid.tasklogs.TaskLogStreamer;
import org.jboss.netty.handler.codec.http.HttpMethod;
import org.joda.time.Duration;
import javax.annotation.Nullable;
import java.io.IOException;
@ -100,6 +105,7 @@ public class KubernetesTaskRunner implements TaskLogStreamer, TaskRunner
private final ListeningExecutorService exec;
private final HttpClient httpClient;
private final PeonLifecycleFactory peonLifecycleFactory;
private final ServiceEmitter emitter;
public KubernetesTaskRunner(
@ -107,7 +113,8 @@ public class KubernetesTaskRunner implements TaskLogStreamer, TaskRunner
KubernetesTaskRunnerConfig config,
KubernetesPeonClient client,
HttpClient httpClient,
PeonLifecycleFactory peonLifecycleFactory
PeonLifecycleFactory peonLifecycleFactory,
ServiceEmitter emitter
)
{
this.adapter = adapter;
@ -119,6 +126,7 @@ public class KubernetesTaskRunner implements TaskLogStreamer, TaskRunner
this.exec = MoreExecutors.listeningDecorator(
Execs.multiThreaded(config.getCapacity(), "k8s-task-runner-%d")
);
this.emitter = emitter;
}
@Override
@ -162,7 +170,10 @@ public class KubernetesTaskRunner implements TaskLogStreamer, TaskRunner
protected TaskStatus doTask(Task task, boolean run)
{
try {
KubernetesPeonLifecycle peonLifecycle = peonLifecycleFactory.build(task);
KubernetesPeonLifecycle peonLifecycle = peonLifecycleFactory.build(
task,
this::emitTaskStateMetrics
);
synchronized (tasks) {
KubernetesWorkItem workItem = tasks.get(task.getId());
@ -206,6 +217,33 @@ public class KubernetesTaskRunner implements TaskLogStreamer, TaskRunner
}
}
@VisibleForTesting
protected void emitTaskStateMetrics(KubernetesPeonLifecycle.State state, String taskId)
{
switch (state) {
case RUNNING:
KubernetesWorkItem workItem;
synchronized (tasks) {
workItem = tasks.get(taskId);
if (workItem == null) {
log.error("Task [%s] disappeared", taskId);
return;
}
}
ServiceMetricEvent.Builder metricBuilder = new ServiceMetricEvent.Builder();
IndexTaskUtils.setTaskDimensions(metricBuilder, workItem.getTask());
emitter.emit(
metricBuilder.build(
"task/pending/time",
new Duration(workItem.getCreatedTime(), DateTimes.nowUtc()).getMillis()
)
);
default:
// ignore other state transition now
return;
}
}
@Override
public void updateStatus(Task task, TaskStatus status)
{

View File

@ -28,6 +28,7 @@ import org.apache.druid.guice.annotations.Smile;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.overlord.TaskRunnerFactory;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.http.client.HttpClient;
import org.apache.druid.k8s.overlord.common.DruidKubernetesClient;
import org.apache.druid.k8s.overlord.common.KubernetesPeonClient;
@ -54,6 +55,7 @@ public class KubernetesTaskRunnerFactory implements TaskRunnerFactory<Kubernetes
private final TaskConfig taskConfig;
private final Properties properties;
private final DruidKubernetesClient druidKubernetesClient;
private final ServiceEmitter emitter;
private KubernetesTaskRunner runner;
@ -67,7 +69,8 @@ public class KubernetesTaskRunnerFactory implements TaskRunnerFactory<Kubernetes
@Self DruidNode druidNode,
TaskConfig taskConfig,
Properties properties,
DruidKubernetesClient druidKubernetesClient
DruidKubernetesClient druidKubernetesClient,
ServiceEmitter emitter
)
{
this.smileMapper = smileMapper;
@ -79,6 +82,7 @@ public class KubernetesTaskRunnerFactory implements TaskRunnerFactory<Kubernetes
this.taskConfig = taskConfig;
this.properties = properties;
this.druidKubernetesClient = druidKubernetesClient;
this.emitter = emitter;
}
@Override
@ -96,7 +100,8 @@ public class KubernetesTaskRunnerFactory implements TaskRunnerFactory<Kubernetes
kubernetesTaskRunnerConfig,
peonClient,
httpClient,
new KubernetesPeonLifecycleFactory(peonClient, taskLogs, smileMapper)
new KubernetesPeonLifecycleFactory(peonClient, taskLogs, smileMapper),
emitter
);
return runner;
}

View File

@ -123,4 +123,9 @@ public class KubernetesWorkItem extends TaskRunnerWorkItem
{
return task.getDataSource();
}
public Task getTask()
{
return task;
}
}

View File

@ -23,5 +23,5 @@ import org.apache.druid.indexing.common.task.Task;
public interface PeonLifecycleFactory
{
KubernetesPeonLifecycle build(Task task);
KubernetesPeonLifecycle build(Task task, KubernetesPeonLifecycle.TaskStateListener stateListener);
}

View File

@ -63,6 +63,7 @@ public class KubernetesPeonLifecycleTest extends EasyMockSupport
@Mock TaskLogs taskLogs;
@Mock LogWatch logWatch;
@Mock KubernetesPeonLifecycle.TaskStateListener stateListener;
private ObjectMapper mapper;
private Task task;
@ -80,7 +81,14 @@ public class KubernetesPeonLifecycleTest extends EasyMockSupport
@Test
public void test_run()
{
KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle(task, kubernetesClient, taskLogs, mapper) {
KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle(
task,
kubernetesClient,
taskLogs,
mapper,
stateListener
)
{
@Override
protected synchronized TaskStatus join(long timeout)
{
@ -98,6 +106,11 @@ public class KubernetesPeonLifecycleTest extends EasyMockSupport
Assert.assertEquals(KubernetesPeonLifecycle.State.NOT_STARTED, peonLifecycle.getState());
stateListener.stateChanged(KubernetesPeonLifecycle.State.PENDING, ID);
EasyMock.expectLastCall().once();
stateListener.stateChanged(KubernetesPeonLifecycle.State.STOPPED, ID);
EasyMock.expectLastCall().once();
replayAll();
TaskStatus taskStatus = peonLifecycle.run(job, 0L, 0L);
@ -112,7 +125,14 @@ public class KubernetesPeonLifecycleTest extends EasyMockSupport
@Test
public void test_run_whenCalledMultipleTimes_raisesIllegalStateException()
{
KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle(task, kubernetesClient, taskLogs, mapper) {
KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle(
task,
kubernetesClient,
taskLogs,
mapper,
stateListener
)
{
@Override
protected synchronized TaskStatus join(long timeout)
{
@ -130,6 +150,11 @@ public class KubernetesPeonLifecycleTest extends EasyMockSupport
Assert.assertEquals(KubernetesPeonLifecycle.State.NOT_STARTED, peonLifecycle.getState());
stateListener.stateChanged(KubernetesPeonLifecycle.State.PENDING, ID);
EasyMock.expectLastCall().once();
stateListener.stateChanged(KubernetesPeonLifecycle.State.STOPPED, ID);
EasyMock.expectLastCall().once();
replayAll();
peonLifecycle.run(job, 0L, 0L);
@ -148,7 +173,14 @@ public class KubernetesPeonLifecycleTest extends EasyMockSupport
@Test
public void test_run_whenExceptionRaised_setsRunnerTaskStateToNone()
{
KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle(task, kubernetesClient, taskLogs, mapper) {
KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle(
task,
kubernetesClient,
taskLogs,
mapper,
stateListener
)
{
@Override
protected synchronized TaskStatus join(long timeout)
{
@ -164,9 +196,13 @@ public class KubernetesPeonLifecycleTest extends EasyMockSupport
EasyMock.eq(TimeUnit.MILLISECONDS)
)).andReturn(null);
EasyMock.expect(kubernetesClient.deletePeonJob(
new K8sTaskId(ID)
new K8sTaskId(ID)
)).andReturn(true);
Assert.assertEquals(KubernetesPeonLifecycle.State.NOT_STARTED, peonLifecycle.getState());
stateListener.stateChanged(KubernetesPeonLifecycle.State.PENDING, ID);
EasyMock.expectLastCall().once();
stateListener.stateChanged(KubernetesPeonLifecycle.State.STOPPED, ID);
EasyMock.expectLastCall().once();
replayAll();
@ -183,7 +219,13 @@ public class KubernetesPeonLifecycleTest extends EasyMockSupport
@Test
public void test_join_withoutJob_returnsFailedTaskStatus() throws IOException
{
KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle(task, kubernetesClient, taskLogs, mapper);
KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle(
task,
kubernetesClient,
taskLogs,
mapper,
stateListener
);
EasyMock.expect(kubernetesClient.waitForPeonJobCompletion(
EasyMock.eq(k8sTaskId),
@ -194,6 +236,10 @@ public class KubernetesPeonLifecycleTest extends EasyMockSupport
EasyMock.expect(taskLogs.streamTaskStatus(ID)).andReturn(Optional.absent());
taskLogs.pushTaskLog(EasyMock.eq(ID), EasyMock.anyObject(File.class));
EasyMock.expectLastCall();
stateListener.stateChanged(KubernetesPeonLifecycle.State.RUNNING, ID);
EasyMock.expectLastCall().once();
stateListener.stateChanged(KubernetesPeonLifecycle.State.STOPPED, ID);
EasyMock.expectLastCall().once();
logWatch.close();
EasyMock.expectLastCall();
EasyMock.expect(kubernetesClient.deletePeonJob(k8sTaskId)).andReturn(true);
@ -213,7 +259,13 @@ public class KubernetesPeonLifecycleTest extends EasyMockSupport
@Test
public void test_join() throws IOException
{
KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle(task, kubernetesClient, taskLogs, mapper);
KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle(
task,
kubernetesClient,
taskLogs,
mapper,
stateListener
);
Job job = new JobBuilder()
.withNewMetadata()
@ -237,6 +289,10 @@ public class KubernetesPeonLifecycleTest extends EasyMockSupport
));
taskLogs.pushTaskLog(EasyMock.eq(ID), EasyMock.anyObject(File.class));
EasyMock.expectLastCall();
stateListener.stateChanged(KubernetesPeonLifecycle.State.RUNNING, ID);
EasyMock.expectLastCall().once();
stateListener.stateChanged(KubernetesPeonLifecycle.State.STOPPED, ID);
EasyMock.expectLastCall().once();
logWatch.close();
EasyMock.expectLastCall();
EasyMock.expect(kubernetesClient.deletePeonJob(k8sTaskId)).andReturn(true);
@ -256,7 +312,13 @@ public class KubernetesPeonLifecycleTest extends EasyMockSupport
@Test
public void test_join_whenCalledMultipleTimes_raisesIllegalStateException() throws IOException
{
KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle(task, kubernetesClient, taskLogs, mapper);
KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle(
task,
kubernetesClient,
taskLogs,
mapper,
stateListener
);
Job job = new JobBuilder()
.withNewMetadata()
@ -282,6 +344,10 @@ public class KubernetesPeonLifecycleTest extends EasyMockSupport
EasyMock.expectLastCall();
logWatch.close();
EasyMock.expectLastCall();
stateListener.stateChanged(KubernetesPeonLifecycle.State.RUNNING, ID);
EasyMock.expectLastCall().once();
stateListener.stateChanged(KubernetesPeonLifecycle.State.STOPPED, ID);
EasyMock.expectLastCall().once();
logWatch.close();
EasyMock.expectLastCall();
EasyMock.expect(kubernetesClient.deletePeonJob(k8sTaskId)).andReturn(true);
@ -307,7 +373,13 @@ public class KubernetesPeonLifecycleTest extends EasyMockSupport
@Test
public void test_join_withoutTaskStatus_returnsFailedTaskStatus() throws IOException
{
KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle(task, kubernetesClient, taskLogs, mapper);
KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle(
task,
kubernetesClient,
taskLogs,
mapper,
stateListener
);
Job job = new JobBuilder()
.withNewMetadata()
@ -327,6 +399,10 @@ public class KubernetesPeonLifecycleTest extends EasyMockSupport
EasyMock.expect(taskLogs.streamTaskStatus(ID)).andReturn(Optional.absent());
taskLogs.pushTaskLog(EasyMock.eq(ID), EasyMock.anyObject(File.class));
EasyMock.expectLastCall();
stateListener.stateChanged(KubernetesPeonLifecycle.State.RUNNING, ID);
EasyMock.expectLastCall().once();
stateListener.stateChanged(KubernetesPeonLifecycle.State.STOPPED, ID);
EasyMock.expectLastCall().once();
logWatch.close();
EasyMock.expectLastCall();
EasyMock.expect(kubernetesClient.deletePeonJob(k8sTaskId)).andReturn(true);
@ -348,7 +424,13 @@ public class KubernetesPeonLifecycleTest extends EasyMockSupport
@Test
public void test_join_whenIOExceptionThrownWhileStreamingTaskStatus_returnsFailedTaskStatus() throws IOException
{
KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle(task, kubernetesClient, taskLogs, mapper);
KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle(
task,
kubernetesClient,
taskLogs,
mapper,
stateListener
);
Job job = new JobBuilder()
.withNewMetadata()
@ -368,6 +450,10 @@ public class KubernetesPeonLifecycleTest extends EasyMockSupport
EasyMock.expect(taskLogs.streamTaskStatus(ID)).andThrow(new IOException());
taskLogs.pushTaskLog(EasyMock.eq(ID), EasyMock.anyObject(File.class));
EasyMock.expectLastCall();
stateListener.stateChanged(KubernetesPeonLifecycle.State.RUNNING, ID);
EasyMock.expectLastCall().once();
stateListener.stateChanged(KubernetesPeonLifecycle.State.STOPPED, ID);
EasyMock.expectLastCall().once();
logWatch.close();
EasyMock.expectLastCall();
EasyMock.expect(kubernetesClient.deletePeonJob(k8sTaskId)).andReturn(true);
@ -389,7 +475,13 @@ public class KubernetesPeonLifecycleTest extends EasyMockSupport
@Test
public void test_join_whenIOExceptionThrownWhileStreamingTaskLogs_isIgnored() throws IOException
{
KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle(task, kubernetesClient, taskLogs, mapper);
KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle(
task,
kubernetesClient,
taskLogs,
mapper,
stateListener
);
Job job = new JobBuilder()
.withNewMetadata()
@ -411,6 +503,10 @@ public class KubernetesPeonLifecycleTest extends EasyMockSupport
);
taskLogs.pushTaskLog(EasyMock.eq(ID), EasyMock.anyObject(File.class));
EasyMock.expectLastCall().andThrow(new IOException());
stateListener.stateChanged(KubernetesPeonLifecycle.State.RUNNING, ID);
EasyMock.expectLastCall().once();
stateListener.stateChanged(KubernetesPeonLifecycle.State.STOPPED, ID);
EasyMock.expectLastCall().once();
logWatch.close();
EasyMock.expectLastCall();
EasyMock.expect(kubernetesClient.deletePeonJob(k8sTaskId)).andReturn(true);
@ -427,11 +523,16 @@ public class KubernetesPeonLifecycleTest extends EasyMockSupport
Assert.assertEquals(KubernetesPeonLifecycle.State.STOPPED, peonLifecycle.getState());
}
@Test
public void test_join_whenRuntimeExceptionThrownWhileWaitingForKubernetesJob_throwsException() throws IOException
{
KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle(task, kubernetesClient, taskLogs, mapper);
KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle(
task,
kubernetesClient,
taskLogs,
mapper,
stateListener
);
EasyMock.expect(kubernetesClient.waitForPeonJobCompletion(
EasyMock.eq(k8sTaskId),
@ -443,6 +544,10 @@ public class KubernetesPeonLifecycleTest extends EasyMockSupport
EasyMock.expect(kubernetesClient.getPeonLogWatcher(k8sTaskId)).andReturn(Optional.of(logWatch));
taskLogs.pushTaskLog(EasyMock.eq(ID), EasyMock.anyObject(File.class));
EasyMock.expectLastCall();
stateListener.stateChanged(KubernetesPeonLifecycle.State.RUNNING, ID);
EasyMock.expectLastCall().once();
stateListener.stateChanged(KubernetesPeonLifecycle.State.STOPPED, ID);
EasyMock.expectLastCall().once();
logWatch.close();
EasyMock.expectLastCall();
@ -462,14 +567,26 @@ public class KubernetesPeonLifecycleTest extends EasyMockSupport
@Test
public void test_shutdown_withNotStartedTaskState()
{
KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle(task, kubernetesClient, taskLogs, mapper);
KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle(
task,
kubernetesClient,
taskLogs,
mapper,
stateListener
);
peonLifecycle.shutdown();
}
@Test
public void test_shutdown_withPendingTaskState() throws NoSuchFieldException, IllegalAccessException
{
KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle(task, kubernetesClient, taskLogs, mapper);
KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle(
task,
kubernetesClient,
taskLogs,
mapper,
stateListener
);
setPeonLifecycleState(peonLifecycle, KubernetesPeonLifecycle.State.PENDING);
EasyMock.expect(kubernetesClient.deletePeonJob(k8sTaskId)).andReturn(true);
@ -484,7 +601,13 @@ public class KubernetesPeonLifecycleTest extends EasyMockSupport
@Test
public void test_shutdown_withRunningTaskState() throws NoSuchFieldException, IllegalAccessException
{
KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle(task, kubernetesClient, taskLogs, mapper);
KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle(
task,
kubernetesClient,
taskLogs,
mapper,
stateListener
);
setPeonLifecycleState(peonLifecycle, KubernetesPeonLifecycle.State.RUNNING);
EasyMock.expect(kubernetesClient.deletePeonJob(k8sTaskId)).andReturn(true);
@ -499,7 +622,13 @@ public class KubernetesPeonLifecycleTest extends EasyMockSupport
@Test
public void test_shutdown_withStoppedTaskState() throws NoSuchFieldException, IllegalAccessException
{
KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle(task, kubernetesClient, taskLogs, mapper);
KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle(
task,
kubernetesClient,
taskLogs,
mapper,
stateListener
);
setPeonLifecycleState(peonLifecycle, KubernetesPeonLifecycle.State.STOPPED);
peonLifecycle.shutdown();
@ -508,7 +637,13 @@ public class KubernetesPeonLifecycleTest extends EasyMockSupport
@Test
public void test_streamLogs_withNotStartedTaskState() throws NoSuchFieldException, IllegalAccessException
{
KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle(task, kubernetesClient, taskLogs, mapper);
KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle(
task,
kubernetesClient,
taskLogs,
mapper,
stateListener
);
setPeonLifecycleState(peonLifecycle, KubernetesPeonLifecycle.State.NOT_STARTED);
peonLifecycle.streamLogs();
@ -517,7 +652,13 @@ public class KubernetesPeonLifecycleTest extends EasyMockSupport
@Test
public void test_streamLogs_withPendingTaskState() throws NoSuchFieldException, IllegalAccessException
{
KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle(task, kubernetesClient, taskLogs, mapper);
KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle(
task,
kubernetesClient,
taskLogs,
mapper,
stateListener
);
setPeonLifecycleState(peonLifecycle, KubernetesPeonLifecycle.State.PENDING);
peonLifecycle.streamLogs();
@ -526,7 +667,13 @@ public class KubernetesPeonLifecycleTest extends EasyMockSupport
@Test
public void test_streamLogs_withRunningTaskState() throws NoSuchFieldException, IllegalAccessException
{
KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle(task, kubernetesClient, taskLogs, mapper);
KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle(
task,
kubernetesClient,
taskLogs,
mapper,
stateListener
);
setPeonLifecycleState(peonLifecycle, KubernetesPeonLifecycle.State.RUNNING);
EasyMock.expect(kubernetesClient.getPeonLogs(k8sTaskId)).andReturn(
@ -543,7 +690,13 @@ public class KubernetesPeonLifecycleTest extends EasyMockSupport
@Test
public void test_streamLogs_withStoppedTaskState() throws NoSuchFieldException, IllegalAccessException
{
KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle(task, kubernetesClient, taskLogs, mapper);
KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle(
task,
kubernetesClient,
taskLogs,
mapper,
stateListener
);
setPeonLifecycleState(peonLifecycle, KubernetesPeonLifecycle.State.STOPPED);
peonLifecycle.streamLogs();
@ -553,7 +706,13 @@ public class KubernetesPeonLifecycleTest extends EasyMockSupport
public void test_getTaskLocation_withNotStartedTaskState_returnsUnknown()
throws NoSuchFieldException, IllegalAccessException
{
KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle(task, kubernetesClient, taskLogs, mapper);
KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle(
task,
kubernetesClient,
taskLogs,
mapper,
stateListener
);
setPeonLifecycleState(peonLifecycle, KubernetesPeonLifecycle.State.NOT_STARTED);
Assert.assertEquals(TaskLocation.unknown(), peonLifecycle.getTaskLocation());
@ -563,7 +722,13 @@ public class KubernetesPeonLifecycleTest extends EasyMockSupport
public void test_getTaskLocation_withPendingTaskState_returnsUnknown()
throws NoSuchFieldException, IllegalAccessException
{
KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle(task, kubernetesClient, taskLogs, mapper);
KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle(
task,
kubernetesClient,
taskLogs,
mapper,
stateListener
);
setPeonLifecycleState(peonLifecycle, KubernetesPeonLifecycle.State.PENDING);
Assert.assertEquals(TaskLocation.unknown(), peonLifecycle.getTaskLocation());
@ -573,7 +738,13 @@ public class KubernetesPeonLifecycleTest extends EasyMockSupport
public void test_getTaskLocation_withRunningTaskState_withoutPeonPod_returnsUnknown()
throws NoSuchFieldException, IllegalAccessException
{
KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle(task, kubernetesClient, taskLogs, mapper);
KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle(
task,
kubernetesClient,
taskLogs,
mapper,
stateListener
);
setPeonLifecycleState(peonLifecycle, KubernetesPeonLifecycle.State.RUNNING);
EasyMock.expect(kubernetesClient.getPeonPod(k8sTaskId.getK8sJobName())).andReturn(Optional.absent());
@ -589,7 +760,13 @@ public class KubernetesPeonLifecycleTest extends EasyMockSupport
public void test_getTaskLocation_withRunningTaskState_withPeonPodWithoutStatus_returnsUnknown()
throws NoSuchFieldException, IllegalAccessException
{
KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle(task, kubernetesClient, taskLogs, mapper);
KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle(
task,
kubernetesClient,
taskLogs,
mapper,
stateListener
);
setPeonLifecycleState(peonLifecycle, KubernetesPeonLifecycle.State.RUNNING);
Pod pod = new PodBuilder()
@ -611,7 +788,13 @@ public class KubernetesPeonLifecycleTest extends EasyMockSupport
public void test_getTaskLocation_withRunningTaskState_withPeonPodWithStatus_returnsLocation()
throws NoSuchFieldException, IllegalAccessException
{
KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle(task, kubernetesClient, taskLogs, mapper);
KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle(
task,
kubernetesClient,
taskLogs,
mapper,
stateListener
);
setPeonLifecycleState(peonLifecycle, KubernetesPeonLifecycle.State.RUNNING);
Pod pod = new PodBuilder()
@ -640,7 +823,13 @@ public class KubernetesPeonLifecycleTest extends EasyMockSupport
public void test_getTaskLocation_saveTaskLocation()
throws NoSuchFieldException, IllegalAccessException
{
KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle(task, kubernetesClient, taskLogs, mapper);
KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle(
task,
kubernetesClient,
taskLogs,
mapper,
stateListener
);
setPeonLifecycleState(peonLifecycle, KubernetesPeonLifecycle.State.RUNNING);
Pod pod = new PodBuilder()
@ -669,7 +858,13 @@ public class KubernetesPeonLifecycleTest extends EasyMockSupport
public void test_getTaskLocation_withRunningTaskState_withPeonPodWithStatusWithTLSAnnotation_returnsLocation()
throws NoSuchFieldException, IllegalAccessException
{
KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle(task, kubernetesClient, taskLogs, mapper);
KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle(
task,
kubernetesClient,
taskLogs,
mapper,
stateListener
);
setPeonLifecycleState(peonLifecycle, KubernetesPeonLifecycle.State.RUNNING);
Pod pod = new PodBuilder()
@ -699,7 +894,13 @@ public class KubernetesPeonLifecycleTest extends EasyMockSupport
public void test_getTaskLocation_withStoppedTaskState_returnsUnknown()
throws NoSuchFieldException, IllegalAccessException
{
KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle(task, kubernetesClient, taskLogs, mapper);
KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle(
task,
kubernetesClient,
taskLogs,
mapper,
stateListener
);
setPeonLifecycleState(peonLifecycle, KubernetesPeonLifecycle.State.STOPPED);
Assert.assertEquals(TaskLocation.unknown(), peonLifecycle.getTaskLocation());

View File

@ -24,6 +24,7 @@ import org.apache.druid.indexing.common.TestUtils;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.common.config.TaskConfigBuilder;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.k8s.overlord.common.DruidKubernetesClient;
import org.apache.druid.k8s.overlord.taskadapter.MultiContainerTaskAdapter;
import org.apache.druid.k8s.overlord.taskadapter.PodTemplateTaskAdapter;
@ -32,6 +33,7 @@ import org.apache.druid.server.DruidNode;
import org.apache.druid.server.log.StartupLoggingConfig;
import org.apache.druid.tasklogs.NoopTaskLogs;
import org.apache.druid.tasklogs.TaskLogs;
import org.easymock.Mock;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@ -50,6 +52,7 @@ public class KubernetesTaskRunnerFactoryTest
private Properties properties;
private DruidKubernetesClient druidKubernetesClient;
@Mock private ServiceEmitter emitter;
@Before
public void setup()
@ -86,7 +89,8 @@ public class KubernetesTaskRunnerFactoryTest
druidNode,
taskConfig,
properties,
druidKubernetesClient
druidKubernetesClient,
emitter
);
KubernetesTaskRunner expectedRunner = factory.build();
@ -107,7 +111,8 @@ public class KubernetesTaskRunnerFactoryTest
druidNode,
taskConfig,
properties,
druidKubernetesClient
druidKubernetesClient,
emitter
);
KubernetesTaskRunner runner = factory.build();
@ -133,7 +138,8 @@ public class KubernetesTaskRunnerFactoryTest
druidNode,
taskConfig,
properties,
druidKubernetesClient
druidKubernetesClient,
emitter
);
KubernetesTaskRunner runner = factory.build();
@ -157,7 +163,8 @@ public class KubernetesTaskRunnerFactoryTest
druidNode,
taskConfig,
props,
druidKubernetesClient
druidKubernetesClient,
emitter
);
KubernetesTaskRunner runner = factory.build();
@ -186,7 +193,8 @@ public class KubernetesTaskRunnerFactoryTest
druidNode,
taskConfig,
props,
druidKubernetesClient
druidKubernetesClient,
emitter
);
Assert.assertThrows(
@ -216,7 +224,8 @@ public class KubernetesTaskRunnerFactoryTest
druidNode,
taskConfig,
props,
druidKubernetesClient
druidKubernetesClient,
emitter
);
KubernetesTaskRunner runner = factory.build();
@ -240,7 +249,8 @@ public class KubernetesTaskRunnerFactoryTest
druidNode,
taskConfig,
props,
druidKubernetesClient
druidKubernetesClient,
emitter
);
KubernetesTaskRunner runner = factory.build();
@ -267,7 +277,8 @@ public class KubernetesTaskRunnerFactoryTest
druidNode,
taskConfig,
props,
druidKubernetesClient
druidKubernetesClient,
emitter
);
KubernetesTaskRunner runner = factory.build();

View File

@ -33,6 +33,8 @@ import org.apache.druid.indexing.common.task.NoopTask;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.overlord.TaskRunnerWorkItem;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.emitter.service.ServiceEventBuilder;
import org.apache.druid.java.util.http.client.HttpClient;
import org.apache.druid.java.util.http.client.Request;
import org.apache.druid.java.util.http.client.response.InputStreamResponseHandler;
@ -73,6 +75,7 @@ public class KubernetesTaskRunnerTest extends EasyMockSupport
@Mock private TaskAdapter taskAdapter;
@Mock private KubernetesPeonClient peonClient;
@Mock private KubernetesPeonLifecycle kubernetesPeonLifecycle;
@Mock private ServiceEmitter emitter;
private KubernetesTaskRunnerConfig config;
private KubernetesTaskRunner runner;
@ -92,7 +95,8 @@ public class KubernetesTaskRunnerTest extends EasyMockSupport
config,
peonClient,
httpClient,
new TestPeonLifecycleFactory(kubernetesPeonLifecycle)
new TestPeonLifecycleFactory(kubernetesPeonLifecycle),
emitter
);
}
@ -289,7 +293,8 @@ public class KubernetesTaskRunnerTest extends EasyMockSupport
config,
peonClient,
httpClient,
new TestPeonLifecycleFactory(kubernetesPeonLifecycle)
new TestPeonLifecycleFactory(kubernetesPeonLifecycle),
emitter
) {
@Override
protected ListenableFuture<TaskStatus> joinAsync(Task task)
@ -325,7 +330,8 @@ public class KubernetesTaskRunnerTest extends EasyMockSupport
config,
peonClient,
httpClient,
new TestPeonLifecycleFactory(kubernetesPeonLifecycle)
new TestPeonLifecycleFactory(kubernetesPeonLifecycle),
emitter
) {
@Override
protected ListenableFuture<TaskStatus> joinAsync(Task task)
@ -599,4 +605,26 @@ public class KubernetesTaskRunnerTest extends EasyMockSupport
verifyAll();
}
@Test
public void test_metricsReported_whenTaskStateChange()
{
KubernetesWorkItem workItem = new KubernetesWorkItem(task, null) {
@Override
public TaskLocation getLocation()
{
return TaskLocation.unknown();
}
};
runner.tasks.put(task.getId(), workItem);
emitter.emit(EasyMock.anyObject(ServiceEventBuilder.class));
replayAll();
runner.emitTaskStateMetrics(KubernetesPeonLifecycle.State.RUNNING, task.getId());
verifyAll();
}
}

View File

@ -55,6 +55,7 @@ public class KubernetesWorkItemTest extends EasyMockSupport
task,
null,
null,
null,
null
));
@ -64,6 +65,7 @@ public class KubernetesWorkItemTest extends EasyMockSupport
task,
null,
null,
null,
null
))
);
@ -157,6 +159,7 @@ public class KubernetesWorkItemTest extends EasyMockSupport
task,
null,
null,
null,
null
));
@ -170,6 +173,7 @@ public class KubernetesWorkItemTest extends EasyMockSupport
task,
null,
null,
null,
null
) {
@Override
@ -191,6 +195,7 @@ public class KubernetesWorkItemTest extends EasyMockSupport
task,
null,
null,
null,
null
) {
@Override
@ -212,6 +217,7 @@ public class KubernetesWorkItemTest extends EasyMockSupport
task,
null,
null,
null,
null
) {
@Override
@ -239,6 +245,7 @@ public class KubernetesWorkItemTest extends EasyMockSupport
task,
null,
null,
null,
null
));
Assert.assertFalse(workItem.streamTaskLogs().isPresent());
@ -257,6 +264,7 @@ public class KubernetesWorkItemTest extends EasyMockSupport
task,
null,
null,
null,
null
));
@ -274,4 +282,10 @@ public class KubernetesWorkItemTest extends EasyMockSupport
{
Assert.assertEquals(task.getDataSource(), workItem.getDataSource());
}
@Test
public void test_getTask()
{
Assert.assertEquals(task, workItem.getTask());
}
}

View File

@ -31,7 +31,7 @@ public class TestPeonLifecycleFactory implements PeonLifecycleFactory
}
@Override
public KubernetesPeonLifecycle build(Task task)
public KubernetesPeonLifecycle build(Task task, KubernetesPeonLifecycle.TaskStateListener stateListener)
{
return kubernetesPeonLifecycle;
}