Report more metrics to monitor K8s task runner (#14771)

* Report pod running metrics to monitor K8s task runner

* refine method definition

* fix checkstyle

* implement task metrics

* more comment

* address comments

* update doc for the new metrics reported

* fix checkstyle

* refine method definition

* minor refine
This commit is contained in:
YongGang 2023-08-16 11:03:53 -07:00 committed by GitHub
parent 97c3773012
commit 3954685aae
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 76 additions and 15 deletions

View File

@ -234,6 +234,12 @@ data:
|`druid.indexer.runner.graceTerminationPeriodSeconds`| `Long` | Number of seconds you want to wait after a sigterm for container lifecycle hooks to complete. Keep at a smaller value if you want tasks to hold locks for shorter periods. |`PT30S` (K8s default)|No|
|`druid.indexer.runner.capacity`| `Integer` | Number of concurrent jobs that can be sent to Kubernetes. |`2147483647`|No|
### Metrics added
|Metric|Description|Dimensions|Normal value|
|------|-----------|----------|------------|
| `k8s/peon/startup/time` | Metric indicating the milliseconds for peon pod to startup. | `dataSource`, `taskId`, `taskType`, `groupId`, `taskStatus`, `tags` |Varies|
### Gotchas
- All Druid Pods belonging to one Druid cluster must be inside the same Kubernetes namespace.

View File

@ -84,6 +84,7 @@ public class KubernetesPeonLifecycle
private final AtomicReference<State> state = new AtomicReference<>(State.NOT_STARTED);
private final K8sTaskId taskId;
private final TaskLogs taskLogs;
private final Task task;
private final KubernetesPeonClient kubernetesClient;
private final ObjectMapper mapper;
private final TaskStateListener stateListener;
@ -102,6 +103,7 @@ public class KubernetesPeonLifecycle
)
{
this.taskId = new K8sTaskId(task);
this.task = task;
this.kubernetesClient = kubernetesClient;
this.taskLogs = taskLogs;
this.mapper = mapper;
@ -126,6 +128,7 @@ public class KubernetesPeonLifecycle
taskLocation = null;
kubernetesClient.launchPeonJobAndWaitForStart(
job,
task,
launchTimeout,
TimeUnit.MILLISECONDS
);

View File

@ -108,7 +108,8 @@ public class KubernetesTaskRunner implements TaskLogStreamer, TaskRunner
private final HttpClient httpClient;
private final PeonLifecycleFactory peonLifecycleFactory;
private final ServiceEmitter emitter;
// currently worker categories aren't supported, so it's hardcoded.
protected static final String WORKER_CATEGORY = "_k8s_worker_category";
public KubernetesTaskRunner(
TaskAdapter adapter,
@ -356,7 +357,7 @@ public class KubernetesTaskRunner implements TaskLogStreamer, TaskRunner
@Override
public Map<String, Long> getTotalTaskSlotCount()
{
return ImmutableMap.of("taskQueue", (long) config.getCapacity());
return ImmutableMap.of(WORKER_CATEGORY, (long) config.getCapacity());
}
@Override
@ -374,13 +375,13 @@ public class KubernetesTaskRunner implements TaskLogStreamer, TaskRunner
@Override
public Map<String, Long> getIdleTaskSlotCount()
{
return Collections.emptyMap();
return ImmutableMap.of(WORKER_CATEGORY, (long) Math.max(0, config.getCapacity() - tasks.size()));
}
@Override
public Map<String, Long> getUsedTaskSlotCount()
{
return Collections.emptyMap();
return ImmutableMap.of(WORKER_CATEGORY, (long) Math.min(config.getCapacity(), tasks.size()));
}
@Override

View File

@ -92,7 +92,8 @@ public class KubernetesTaskRunnerFactory implements TaskRunnerFactory<Kubernetes
KubernetesPeonClient peonClient = new KubernetesPeonClient(
druidKubernetesClient,
kubernetesTaskRunnerConfig.getNamespace(),
kubernetesTaskRunnerConfig.isDebugJobs()
kubernetesTaskRunnerConfig.isDebugJobs(),
emitter
);
runner = new KubernetesTaskRunner(

View File

@ -25,8 +25,12 @@ import io.fabric8.kubernetes.api.model.Pod;
import io.fabric8.kubernetes.api.model.batch.v1.Job;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.dsl.LogWatch;
import org.apache.druid.indexing.common.task.IndexTaskUtils;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.java.util.common.RetryUtils;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import java.io.InputStream;
import java.sql.Timestamp;
@ -42,15 +46,22 @@ public class KubernetesPeonClient
private final KubernetesClientApi clientApi;
private final String namespace;
private final boolean debugJobs;
private final ServiceEmitter emitter;
public KubernetesPeonClient(KubernetesClientApi clientApi, String namespace, boolean debugJobs)
public KubernetesPeonClient(
KubernetesClientApi clientApi,
String namespace,
boolean debugJobs,
ServiceEmitter emitter
)
{
this.clientApi = clientApi;
this.namespace = namespace;
this.debugJobs = debugJobs;
this.emitter = emitter;
}
public Pod launchPeonJobAndWaitForStart(Job job, long howLong, TimeUnit timeUnit)
public Pod launchPeonJobAndWaitForStart(Job job, Task task, long howLong, TimeUnit timeUnit)
{
long start = System.currentTimeMillis();
// launch job
@ -69,6 +80,7 @@ public class KubernetesPeonClient
}, howLong, timeUnit);
long duration = System.currentTimeMillis() - start;
log.info("Took task %s %d ms for pod to startup", jobName, duration);
emitK8sPodMetrics(task, "k8s/peon/startup/time", duration);
return result;
});
}
@ -254,4 +266,11 @@ public class KubernetesPeonClient
throw new KubernetesResourceNotFoundException("K8s pod with label: job-name=" + jobName + " not found");
}
}
private void emitK8sPodMetrics(Task task, String metric, long durationMs)
{
ServiceMetricEvent.Builder metricBuilder = new ServiceMetricEvent.Builder();
IndexTaskUtils.setTaskDimensions(metricBuilder, task);
emitter.emit(metricBuilder.build(metric, durationMs));
}
}

View File

@ -100,6 +100,7 @@ public class KubernetesPeonLifecycleTest extends EasyMockSupport
EasyMock.expect(kubernetesClient.launchPeonJobAndWaitForStart(
EasyMock.eq(job),
EasyMock.eq(task),
EasyMock.anyLong(),
EasyMock.eq(TimeUnit.MILLISECONDS)
)).andReturn(null);
@ -144,6 +145,7 @@ public class KubernetesPeonLifecycleTest extends EasyMockSupport
EasyMock.expect(kubernetesClient.launchPeonJobAndWaitForStart(
EasyMock.eq(job),
EasyMock.eq(task),
EasyMock.anyLong(),
EasyMock.eq(TimeUnit.MILLISECONDS)
)).andReturn(null);
@ -192,6 +194,7 @@ public class KubernetesPeonLifecycleTest extends EasyMockSupport
EasyMock.expect(kubernetesClient.launchPeonJobAndWaitForStart(
EasyMock.eq(job),
EasyMock.eq(task),
EasyMock.anyLong(),
EasyMock.eq(TimeUnit.MILLISECONDS)
)).andReturn(null);

View File

@ -367,7 +367,7 @@ public class KubernetesTaskRunnerTest extends EasyMockSupport
MatcherAssert.assertThat(slotCount, Matchers.allOf(
Matchers.aMapWithSize(1),
Matchers.hasEntry(
Matchers.equalTo("taskQueue"),
Matchers.equalTo(KubernetesTaskRunner.WORKER_CATEGORY),
Matchers.equalTo(1L)
)
));

View File

@ -30,7 +30,10 @@ import io.fabric8.kubernetes.client.KubernetesClientTimeoutException;
import io.fabric8.kubernetes.client.dsl.LogWatch;
import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
import io.fabric8.kubernetes.client.server.mock.KubernetesMockServer;
import org.apache.druid.indexing.common.task.NoopTask;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.emitter.core.Event;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@ -38,6 +41,8 @@ import org.junit.jupiter.api.Test;
import java.io.InputStream;
import java.net.HttpURLConnection;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.TimeUnit;
@ -54,12 +59,23 @@ public class KubernetesPeonClientTest
private KubernetesMockServer server;
private KubernetesClientApi clientApi;
private KubernetesPeonClient instance;
private ServiceEmitter serviceEmitter;
private Collection<Event> events;
@BeforeEach
public void setup()
{
clientApi = new TestKubernetesClient(this.client);
instance = new KubernetesPeonClient(clientApi, NAMESPACE, false);
events = new ArrayList<>();
serviceEmitter = new ServiceEmitter("service", "host", null)
{
@Override
public void emit(Event event)
{
events.add(event);
}
};
instance = new KubernetesPeonClient(clientApi, NAMESPACE, false, serviceEmitter);
}
@Test
@ -83,9 +99,10 @@ public class KubernetesPeonClientTest
client.pods().inNamespace(NAMESPACE).resource(pod).create();
Pod peonPod = instance.launchPeonJobAndWaitForStart(job, 1, TimeUnit.SECONDS);
Pod peonPod = instance.launchPeonJobAndWaitForStart(job, NoopTask.create(), 1, TimeUnit.SECONDS);
Assertions.assertNotNull(peonPod);
Assertions.assertEquals(1, events.size());
}
@Test
@ -111,7 +128,7 @@ public class KubernetesPeonClientTest
Assertions.assertThrows(
KubernetesClientTimeoutException.class,
() -> instance.launchPeonJobAndWaitForStart(job, 1, TimeUnit.SECONDS)
() -> instance.launchPeonJobAndWaitForStart(job, NoopTask.create(), 1, TimeUnit.SECONDS)
);
}
@ -204,7 +221,8 @@ public class KubernetesPeonClientTest
KubernetesPeonClient instance = new KubernetesPeonClient(
new TestKubernetesClient(this.client),
NAMESPACE,
true
true,
serviceEmitter
);
Job job = new JobBuilder()
@ -228,7 +246,8 @@ public class KubernetesPeonClientTest
KubernetesPeonClient instance = new KubernetesPeonClient(
new TestKubernetesClient(this.client),
NAMESPACE,
true
true,
serviceEmitter
);
Assertions.assertTrue(instance.deletePeonJob(new K8sTaskId(ID)));

View File

@ -34,6 +34,8 @@ import org.apache.druid.indexing.common.config.TaskConfigBuilder;
import org.apache.druid.indexing.common.task.IndexTask;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexTuningConfig;
import org.apache.druid.java.util.emitter.core.Event;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.k8s.overlord.KubernetesTaskRunnerConfig;
import org.apache.druid.k8s.overlord.common.DruidKubernetesClient;
import org.apache.druid.k8s.overlord.common.JobResponse;
@ -90,7 +92,14 @@ public class DruidPeonClientIntegrationTest
new NamedType(IndexTask.IndexTuningConfig.class, "index")
);
k8sClient = new DruidKubernetesClient();
peonClient = new KubernetesPeonClient(k8sClient, "default", false);
ServiceEmitter serviceEmitter = new ServiceEmitter("service", "host", null)
{
@Override
public void emit(Event event)
{
}
};
peonClient = new KubernetesPeonClient(k8sClient, "default", false, serviceEmitter);
druidNode = new DruidNode(
"test",
null,
@ -130,7 +139,7 @@ public class DruidPeonClientIntegrationTest
Job job = adapter.createJobFromPodSpec(podSpec, task, context);
// launch the job and wait to start...
peonClient.launchPeonJobAndWaitForStart(job, 1, TimeUnit.MINUTES);
peonClient.launchPeonJobAndWaitForStart(job, task, 1, TimeUnit.MINUTES);
// there should be one job that is a k8s peon job that exists
List<Job> jobs = peonClient.getPeonJobs();