Fix log streaming (#14285)

* Fix log streaming

* Add watch log

* Add unit tests

* long running client

* singleton client

* Remove accidental close
This commit is contained in:
George Shiqi Wu 2023-05-22 14:19:53 -04:00 committed by GitHub
parent 36a084e021
commit cb65135b99
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 287 additions and 181 deletions

View File

@ -21,7 +21,10 @@ package org.apache.druid.k8s.overlord;
import com.google.inject.Binder; import com.google.inject.Binder;
import com.google.inject.Key; import com.google.inject.Key;
import com.google.inject.Provides;
import com.google.inject.multibindings.MapBinder; import com.google.inject.multibindings.MapBinder;
import io.fabric8.kubernetes.client.Config;
import io.fabric8.kubernetes.client.ConfigBuilder;
import org.apache.druid.discovery.NodeRole; import org.apache.druid.discovery.NodeRole;
import org.apache.druid.guice.Binders; import org.apache.druid.guice.Binders;
import org.apache.druid.guice.IndexingServiceModuleHelper; import org.apache.druid.guice.IndexingServiceModuleHelper;
@ -34,14 +37,21 @@ import org.apache.druid.indexing.common.tasklogs.FileTaskLogs;
import org.apache.druid.indexing.overlord.TaskRunnerFactory; import org.apache.druid.indexing.overlord.TaskRunnerFactory;
import org.apache.druid.indexing.overlord.config.TaskQueueConfig; import org.apache.druid.indexing.overlord.config.TaskQueueConfig;
import org.apache.druid.initialization.DruidModule; import org.apache.druid.initialization.DruidModule;
import org.apache.druid.java.util.common.lifecycle.Lifecycle;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.k8s.overlord.common.DruidKubernetesClient;
import org.apache.druid.tasklogs.NoopTaskLogs; import org.apache.druid.tasklogs.NoopTaskLogs;
import org.apache.druid.tasklogs.TaskLogKiller; import org.apache.druid.tasklogs.TaskLogKiller;
import org.apache.druid.tasklogs.TaskLogPusher; import org.apache.druid.tasklogs.TaskLogPusher;
import org.apache.druid.tasklogs.TaskLogs; import org.apache.druid.tasklogs.TaskLogs;
@LoadScope(roles = NodeRole.OVERLORD_JSON_NAME) @LoadScope(roles = NodeRole.OVERLORD_JSON_NAME)
public class KubernetesOverlordModule implements DruidModule public class KubernetesOverlordModule implements DruidModule
{ {
private static final Logger log = new Logger(KubernetesOverlordModule.class);
@Override @Override
public void configure(Binder binder) public void configure(Binder binder)
{ {
@ -66,6 +76,41 @@ public class KubernetesOverlordModule implements DruidModule
configureTaskLogs(binder); configureTaskLogs(binder);
} }
@Provides
@LazySingleton
public DruidKubernetesClient makeKubernetesClient(KubernetesTaskRunnerConfig kubernetesTaskRunnerConfig, Lifecycle lifecycle)
{
DruidKubernetesClient client;
if (kubernetesTaskRunnerConfig.isDisableClientProxy()) {
Config config = new ConfigBuilder().build();
config.setHttpsProxy(null);
config.setHttpProxy(null);
client = new DruidKubernetesClient(config);
} else {
client = new DruidKubernetesClient();
}
lifecycle.addHandler(
new Lifecycle.Handler()
{
@Override
public void start()
{
}
@Override
public void stop()
{
log.info("Stopping overlord Kubernetes client");
client.getClient().close();
}
}
);
return client;
}
private void configureTaskLogs(Binder binder) private void configureTaskLogs(Binder binder)
{ {
PolyBind.createChoice(binder, "druid.indexer.logs.type", Key.get(TaskLogs.class), Key.get(FileTaskLogs.class)); PolyBind.createChoice(binder, "druid.indexer.logs.type", Key.get(TaskLogs.class), Key.get(FileTaskLogs.class));

View File

@ -25,6 +25,7 @@ import com.google.common.base.Preconditions;
import io.fabric8.kubernetes.api.model.Pod; import io.fabric8.kubernetes.api.model.Pod;
import io.fabric8.kubernetes.api.model.PodStatus; import io.fabric8.kubernetes.api.model.PodStatus;
import io.fabric8.kubernetes.api.model.batch.v1.Job; import io.fabric8.kubernetes.api.model.batch.v1.Job;
import io.fabric8.kubernetes.client.dsl.LogWatch;
import org.apache.commons.io.FileUtils; import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.druid.indexer.TaskLocation; import org.apache.druid.indexer.TaskLocation;
@ -37,6 +38,7 @@ import org.apache.druid.k8s.overlord.common.JobResponse;
import org.apache.druid.k8s.overlord.common.K8sTaskId; import org.apache.druid.k8s.overlord.common.K8sTaskId;
import org.apache.druid.k8s.overlord.common.KubernetesPeonClient; import org.apache.druid.k8s.overlord.common.KubernetesPeonClient;
import org.apache.druid.tasklogs.TaskLogs; import org.apache.druid.tasklogs.TaskLogs;
import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
@ -78,6 +80,9 @@ public class KubernetesPeonLifecycle
private final KubernetesPeonClient kubernetesClient; private final KubernetesPeonClient kubernetesClient;
private final ObjectMapper mapper; private final ObjectMapper mapper;
@MonotonicNonNull
private LogWatch logWatch;
protected KubernetesPeonLifecycle( protected KubernetesPeonLifecycle(
Task task, Task task,
KubernetesPeonClient kubernetesClient, KubernetesPeonClient kubernetesClient,
@ -151,16 +156,15 @@ public class KubernetesPeonLifecycle
TimeUnit.MILLISECONDS TimeUnit.MILLISECONDS
); );
saveLogs();
return getTaskStatus(jobResponse.getJobDuration()); return getTaskStatus(jobResponse.getJobDuration());
} }
finally { finally {
try { try {
saveLogs();
shutdown(); shutdown();
} }
catch (Exception e) { catch (Exception e) {
log.warn(e, "Task [%s] shutdown failed", taskId); log.warn(e, "Task [%s] cleanup failed", taskId);
} }
state.set(State.STOPPED); state.set(State.STOPPED);
@ -265,14 +269,31 @@ public class KubernetesPeonLifecycle
return taskStatus.withDuration(duration); return taskStatus.withDuration(duration);
} }
private void saveLogs() protected void startWatchingLogs()
{
if (logWatch != null) {
log.debug("There is already a log watcher for %s", taskId.getOriginalTaskId());
return;
}
try {
Optional<LogWatch> maybeLogWatch = kubernetesClient.getPeonLogWatcher(taskId);
if (maybeLogWatch.isPresent()) {
logWatch = maybeLogWatch.get();
}
}
catch (Exception e) {
log.error(e, "Error watching logs from task: %s", taskId);
}
}
protected void saveLogs()
{ {
try { try {
Path file = Files.createTempFile(taskId.getOriginalTaskId(), "log"); Path file = Files.createTempFile(taskId.getOriginalTaskId(), "log");
try { try {
Optional<InputStream> maybeLogStream = streamLogs(); startWatchingLogs();
if (maybeLogStream.isPresent()) { if (logWatch != null) {
FileUtils.copyInputStreamToFile(maybeLogStream.get(), file.toFile()); FileUtils.copyInputStreamToFile(logWatch.getOutput(), file.toFile());
} else { } else {
log.debug("Log stream not found for %s", taskId.getOriginalTaskId()); log.debug("Log stream not found for %s", taskId.getOriginalTaskId());
} }
@ -282,6 +303,9 @@ public class KubernetesPeonLifecycle
log.error(e, "Failed to stream logs for task [%s]", taskId.getOriginalTaskId()); log.error(e, "Failed to stream logs for task [%s]", taskId.getOriginalTaskId());
} }
finally { finally {
if (logWatch != null) {
logWatch.close();
}
Files.deleteIfExists(file); Files.deleteIfExists(file);
} }
} }

View File

@ -21,8 +21,6 @@ package org.apache.druid.k8s.overlord;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.inject.Inject; import com.google.inject.Inject;
import io.fabric8.kubernetes.client.Config;
import io.fabric8.kubernetes.client.ConfigBuilder;
import org.apache.druid.guice.IndexingServiceModuleHelper; import org.apache.druid.guice.IndexingServiceModuleHelper;
import org.apache.druid.guice.annotations.EscalatedGlobal; import org.apache.druid.guice.annotations.EscalatedGlobal;
import org.apache.druid.guice.annotations.Self; import org.apache.druid.guice.annotations.Self;
@ -55,6 +53,7 @@ public class KubernetesTaskRunnerFactory implements TaskRunnerFactory<Kubernetes
private final DruidNode druidNode; private final DruidNode druidNode;
private final TaskConfig taskConfig; private final TaskConfig taskConfig;
private final Properties properties; private final Properties properties;
private final DruidKubernetesClient druidKubernetesClient;
private KubernetesTaskRunner runner; private KubernetesTaskRunner runner;
@ -67,7 +66,8 @@ public class KubernetesTaskRunnerFactory implements TaskRunnerFactory<Kubernetes
TaskLogs taskLogs, TaskLogs taskLogs,
@Self DruidNode druidNode, @Self DruidNode druidNode,
TaskConfig taskConfig, TaskConfig taskConfig,
Properties properties Properties properties,
DruidKubernetesClient druidKubernetesClient
) )
{ {
this.smileMapper = smileMapper; this.smileMapper = smileMapper;
@ -78,29 +78,21 @@ public class KubernetesTaskRunnerFactory implements TaskRunnerFactory<Kubernetes
this.druidNode = druidNode; this.druidNode = druidNode;
this.taskConfig = taskConfig; this.taskConfig = taskConfig;
this.properties = properties; this.properties = properties;
this.druidKubernetesClient = druidKubernetesClient;
} }
@Override @Override
public KubernetesTaskRunner build() public KubernetesTaskRunner build()
{ {
DruidKubernetesClient client;
if (kubernetesTaskRunnerConfig.isDisableClientProxy()) {
Config config = new ConfigBuilder().build();
config.setHttpsProxy(null);
config.setHttpProxy(null);
client = new DruidKubernetesClient(config);
} else {
client = new DruidKubernetesClient();
}
KubernetesPeonClient peonClient = new KubernetesPeonClient( KubernetesPeonClient peonClient = new KubernetesPeonClient(
client, druidKubernetesClient,
kubernetesTaskRunnerConfig.getNamespace(), kubernetesTaskRunnerConfig.getNamespace(),
kubernetesTaskRunnerConfig.isDebugJobs() kubernetesTaskRunnerConfig.isDebugJobs()
); );
runner = new KubernetesTaskRunner( runner = new KubernetesTaskRunner(
buildTaskAdapter(client), buildTaskAdapter(druidKubernetesClient),
kubernetesTaskRunnerConfig, kubernetesTaskRunnerConfig,
peonClient, peonClient,
httpClient, httpClient,

View File

@ -56,6 +56,7 @@ public class KubernetesWorkItem extends TaskRunnerWorkItem
this.shutdownRequested.set(true); this.shutdownRequested.set(true);
if (this.kubernetesPeonLifecycle != null) { if (this.kubernetesPeonLifecycle != null) {
this.kubernetesPeonLifecycle.startWatchingLogs();
this.kubernetesPeonLifecycle.shutdown(); this.kubernetesPeonLifecycle.shutdown();
} }
} }

View File

@ -28,6 +28,7 @@ public class DruidKubernetesClient implements KubernetesClientApi
{ {
private final Config config; private final Config config;
private final KubernetesClient kubernetesClient;
public DruidKubernetesClient() public DruidKubernetesClient()
{ {
@ -37,19 +38,22 @@ public class DruidKubernetesClient implements KubernetesClientApi
public DruidKubernetesClient(Config config) public DruidKubernetesClient(Config config)
{ {
this.config = config; this.config = config;
this.kubernetesClient = new KubernetesClientBuilder().withConfig(config).build();
} }
@Override @Override
public <T> T executeRequest(KubernetesExecutor<T> executor) throws KubernetesResourceNotFoundException public <T> T executeRequest(KubernetesExecutor<T> executor) throws KubernetesResourceNotFoundException
{ {
try (KubernetesClient client = getClient()) { return executor.executeRequest(kubernetesClient);
return executor.executeRequest(client);
}
} }
/** This client automatically gets closed by the druid lifecycle, it should not be closed when used as it is
* meant to be reused.
* @return re-useable KubernetesClient
*/
@Override @Override
public KubernetesClient getClient() public KubernetesClient getClient()
{ {
return new KubernetesClientBuilder().withConfig(config).build(); return this.kubernetesClient;
} }
} }

View File

@ -120,7 +120,7 @@ public class KubernetesPeonClient
} }
} }
public Optional<InputStream> getPeonLogs(K8sTaskId taskId) public Optional<LogWatch> getPeonLogWatcher(K8sTaskId taskId)
{ {
KubernetesClient k8sClient = clientApi.getClient(); KubernetesClient k8sClient = clientApi.getClient();
try { try {
@ -132,14 +132,34 @@ public class KubernetesPeonClient
.inContainer("main") .inContainer("main")
.watchLog(); .watchLog();
if (logWatch == null) { if (logWatch == null) {
k8sClient.close();
return Optional.absent(); return Optional.absent();
} }
return Optional.of(new LogWatchInputStream(k8sClient, logWatch)); return Optional.of(logWatch);
}
catch (Exception e) {
log.error(e, "Error watching logs from task: %s", taskId);
return Optional.absent();
}
}
public Optional<InputStream> getPeonLogs(K8sTaskId taskId)
{
KubernetesClient k8sClient = clientApi.getClient();
try {
InputStream logStream = k8sClient.batch()
.v1()
.jobs()
.inNamespace(namespace)
.withName(taskId.getK8sTaskId())
.inContainer("main")
.getLogInputStream();
if (logStream == null) {
return Optional.absent();
}
return Optional.of(logStream);
} }
catch (Exception e) { catch (Exception e) {
log.error(e, "Error streaming logs from task: %s", taskId); log.error(e, "Error streaming logs from task: %s", taskId);
k8sClient.close();
return Optional.absent(); return Optional.absent();
} }
} }

View File

@ -1,57 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.k8s.overlord.common;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.dsl.LogWatch;
import java.io.IOException;
import java.io.InputStream;
/**
* This wraps the InputStream for k8s client
* When you call close on the stream, it will also close the open
* http connections and the client
*/
public class LogWatchInputStream extends InputStream
{
private final KubernetesClient client;
private final LogWatch logWatch;
public LogWatchInputStream(KubernetesClient client, LogWatch logWatch)
{
this.client = client;
this.logWatch = logWatch;
}
@Override
public int read() throws IOException
{
return logWatch.getOutput().read();
}
@Override
public void close()
{
logWatch.close();
client.close();
}
}

View File

@ -25,6 +25,7 @@ import io.fabric8.kubernetes.api.model.Pod;
import io.fabric8.kubernetes.api.model.PodBuilder; import io.fabric8.kubernetes.api.model.PodBuilder;
import io.fabric8.kubernetes.api.model.batch.v1.Job; import io.fabric8.kubernetes.api.model.batch.v1.Job;
import io.fabric8.kubernetes.api.model.batch.v1.JobBuilder; import io.fabric8.kubernetes.api.model.batch.v1.JobBuilder;
import io.fabric8.kubernetes.client.dsl.LogWatch;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.druid.indexer.TaskLocation; import org.apache.druid.indexer.TaskLocation;
import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexer.TaskStatus;
@ -61,6 +62,8 @@ public class KubernetesPeonLifecycleTest extends EasyMockSupport
@Mock KubernetesPeonClient kubernetesClient; @Mock KubernetesPeonClient kubernetesClient;
@Mock TaskLogs taskLogs; @Mock TaskLogs taskLogs;
@Mock LogWatch logWatch;
private ObjectMapper mapper; private ObjectMapper mapper;
private Task task; private Task task;
private K8sTaskId k8sTaskId; private K8sTaskId k8sTaskId;
@ -71,6 +74,7 @@ public class KubernetesPeonLifecycleTest extends EasyMockSupport
mapper = new TestUtils().getTestObjectMapper(); mapper = new TestUtils().getTestObjectMapper();
task = NoopTask.create(ID, 0); task = NoopTask.create(ID, 0);
k8sTaskId = new K8sTaskId(task); k8sTaskId = new K8sTaskId(task);
EasyMock.expect(logWatch.getOutput()).andReturn(IOUtils.toInputStream("", StandardCharsets.UTF_8)).anyTimes();
} }
@Test @Test
@ -184,12 +188,12 @@ public class KubernetesPeonLifecycleTest extends EasyMockSupport
EasyMock.anyLong(), EasyMock.anyLong(),
EasyMock.eq(TimeUnit.MILLISECONDS) EasyMock.eq(TimeUnit.MILLISECONDS)
)).andReturn(new JobResponse(null, PeonPhase.FAILED)); )).andReturn(new JobResponse(null, PeonPhase.FAILED));
EasyMock.expect(kubernetesClient.getPeonLogs(k8sTaskId)).andReturn(Optional.of( EasyMock.expect(kubernetesClient.getPeonLogWatcher(k8sTaskId)).andReturn(Optional.of(logWatch));
IOUtils.toInputStream("", StandardCharsets.UTF_8)
));
EasyMock.expect(taskLogs.streamTaskStatus(ID)).andReturn(Optional.absent()); EasyMock.expect(taskLogs.streamTaskStatus(ID)).andReturn(Optional.absent());
taskLogs.pushTaskLog(EasyMock.eq(ID), EasyMock.anyObject(File.class)); taskLogs.pushTaskLog(EasyMock.eq(ID), EasyMock.anyObject(File.class));
EasyMock.expectLastCall(); EasyMock.expectLastCall();
logWatch.close();
EasyMock.expectLastCall();
EasyMock.expect(kubernetesClient.deletePeonJob(k8sTaskId)).andReturn(true); EasyMock.expect(kubernetesClient.deletePeonJob(k8sTaskId)).andReturn(true);
replayAll(); replayAll();
@ -225,14 +229,14 @@ public class KubernetesPeonLifecycleTest extends EasyMockSupport
EasyMock.anyLong(), EasyMock.anyLong(),
EasyMock.eq(TimeUnit.MILLISECONDS) EasyMock.eq(TimeUnit.MILLISECONDS)
)).andReturn(new JobResponse(job, PeonPhase.SUCCEEDED)); )).andReturn(new JobResponse(job, PeonPhase.SUCCEEDED));
EasyMock.expect(kubernetesClient.getPeonLogs(k8sTaskId)).andReturn(Optional.of( EasyMock.expect(kubernetesClient.getPeonLogWatcher(k8sTaskId)).andReturn(Optional.of(logWatch));
IOUtils.toInputStream("", StandardCharsets.UTF_8)
));
EasyMock.expect(taskLogs.streamTaskStatus(ID)).andReturn(Optional.of( EasyMock.expect(taskLogs.streamTaskStatus(ID)).andReturn(Optional.of(
IOUtils.toInputStream(mapper.writeValueAsString(SUCCESS), StandardCharsets.UTF_8) IOUtils.toInputStream(mapper.writeValueAsString(SUCCESS), StandardCharsets.UTF_8)
)); ));
taskLogs.pushTaskLog(EasyMock.eq(ID), EasyMock.anyObject(File.class)); taskLogs.pushTaskLog(EasyMock.eq(ID), EasyMock.anyObject(File.class));
EasyMock.expectLastCall(); EasyMock.expectLastCall();
logWatch.close();
EasyMock.expectLastCall();
EasyMock.expect(kubernetesClient.deletePeonJob(k8sTaskId)).andReturn(true); EasyMock.expect(kubernetesClient.deletePeonJob(k8sTaskId)).andReturn(true);
Assert.assertEquals(KubernetesPeonLifecycle.State.NOT_STARTED, peonLifecycle.getState()); Assert.assertEquals(KubernetesPeonLifecycle.State.NOT_STARTED, peonLifecycle.getState());
@ -266,14 +270,18 @@ public class KubernetesPeonLifecycleTest extends EasyMockSupport
EasyMock.anyLong(), EasyMock.anyLong(),
EasyMock.eq(TimeUnit.MILLISECONDS) EasyMock.eq(TimeUnit.MILLISECONDS)
)).andReturn(new JobResponse(job, PeonPhase.SUCCEEDED)); )).andReturn(new JobResponse(job, PeonPhase.SUCCEEDED));
EasyMock.expect(kubernetesClient.getPeonLogs(k8sTaskId)).andReturn( EasyMock.expect(kubernetesClient.getPeonLogWatcher(k8sTaskId)).andReturn(Optional.of(logWatch));
Optional.of(IOUtils.toInputStream("", StandardCharsets.UTF_8))
);
EasyMock.expect(taskLogs.streamTaskStatus(ID)).andReturn( EasyMock.expect(taskLogs.streamTaskStatus(ID)).andReturn(
Optional.of(IOUtils.toInputStream(mapper.writeValueAsString(SUCCESS), StandardCharsets.UTF_8)) Optional.of(IOUtils.toInputStream(mapper.writeValueAsString(SUCCESS), StandardCharsets.UTF_8))
); );
taskLogs.pushTaskLog(EasyMock.eq(ID), EasyMock.anyObject(File.class)); taskLogs.pushTaskLog(EasyMock.eq(ID), EasyMock.anyObject(File.class));
EasyMock.expectLastCall(); EasyMock.expectLastCall();
taskLogs.pushTaskLog(EasyMock.eq(ID), EasyMock.anyObject(File.class));
EasyMock.expectLastCall();
logWatch.close();
EasyMock.expectLastCall();
logWatch.close();
EasyMock.expectLastCall();
EasyMock.expect(kubernetesClient.deletePeonJob(k8sTaskId)).andReturn(true); EasyMock.expect(kubernetesClient.deletePeonJob(k8sTaskId)).andReturn(true);
Assert.assertEquals(KubernetesPeonLifecycle.State.NOT_STARTED, peonLifecycle.getState()); Assert.assertEquals(KubernetesPeonLifecycle.State.NOT_STARTED, peonLifecycle.getState());
@ -313,12 +321,12 @@ public class KubernetesPeonLifecycleTest extends EasyMockSupport
EasyMock.anyLong(), EasyMock.anyLong(),
EasyMock.eq(TimeUnit.MILLISECONDS) EasyMock.eq(TimeUnit.MILLISECONDS)
)).andReturn(new JobResponse(job, PeonPhase.SUCCEEDED)); )).andReturn(new JobResponse(job, PeonPhase.SUCCEEDED));
EasyMock.expect(kubernetesClient.getPeonLogs(k8sTaskId)).andReturn( EasyMock.expect(kubernetesClient.getPeonLogWatcher(k8sTaskId)).andReturn(Optional.of(logWatch));
Optional.of(IOUtils.toInputStream("", StandardCharsets.UTF_8))
);
EasyMock.expect(taskLogs.streamTaskStatus(ID)).andReturn(Optional.absent()); EasyMock.expect(taskLogs.streamTaskStatus(ID)).andReturn(Optional.absent());
taskLogs.pushTaskLog(EasyMock.eq(ID), EasyMock.anyObject(File.class)); taskLogs.pushTaskLog(EasyMock.eq(ID), EasyMock.anyObject(File.class));
EasyMock.expectLastCall(); EasyMock.expectLastCall();
logWatch.close();
EasyMock.expectLastCall();
EasyMock.expect(kubernetesClient.deletePeonJob(k8sTaskId)).andReturn(true); EasyMock.expect(kubernetesClient.deletePeonJob(k8sTaskId)).andReturn(true);
Assert.assertEquals(KubernetesPeonLifecycle.State.NOT_STARTED, peonLifecycle.getState()); Assert.assertEquals(KubernetesPeonLifecycle.State.NOT_STARTED, peonLifecycle.getState());
@ -354,12 +362,12 @@ public class KubernetesPeonLifecycleTest extends EasyMockSupport
EasyMock.anyLong(), EasyMock.anyLong(),
EasyMock.eq(TimeUnit.MILLISECONDS) EasyMock.eq(TimeUnit.MILLISECONDS)
)).andReturn(new JobResponse(job, PeonPhase.SUCCEEDED)); )).andReturn(new JobResponse(job, PeonPhase.SUCCEEDED));
EasyMock.expect(kubernetesClient.getPeonLogs(k8sTaskId)).andReturn( EasyMock.expect(kubernetesClient.getPeonLogWatcher(k8sTaskId)).andReturn(Optional.of(logWatch));
Optional.of(IOUtils.toInputStream("", StandardCharsets.UTF_8))
);
EasyMock.expect(taskLogs.streamTaskStatus(ID)).andThrow(new IOException()); EasyMock.expect(taskLogs.streamTaskStatus(ID)).andThrow(new IOException());
taskLogs.pushTaskLog(EasyMock.eq(ID), EasyMock.anyObject(File.class)); taskLogs.pushTaskLog(EasyMock.eq(ID), EasyMock.anyObject(File.class));
EasyMock.expectLastCall(); EasyMock.expectLastCall();
logWatch.close();
EasyMock.expectLastCall();
EasyMock.expect(kubernetesClient.deletePeonJob(k8sTaskId)).andReturn(true); EasyMock.expect(kubernetesClient.deletePeonJob(k8sTaskId)).andReturn(true);
Assert.assertEquals(KubernetesPeonLifecycle.State.NOT_STARTED, peonLifecycle.getState()); Assert.assertEquals(KubernetesPeonLifecycle.State.NOT_STARTED, peonLifecycle.getState());
@ -395,14 +403,14 @@ public class KubernetesPeonLifecycleTest extends EasyMockSupport
EasyMock.anyLong(), EasyMock.anyLong(),
EasyMock.eq(TimeUnit.MILLISECONDS) EasyMock.eq(TimeUnit.MILLISECONDS)
)).andReturn(new JobResponse(job, PeonPhase.SUCCEEDED)); )).andReturn(new JobResponse(job, PeonPhase.SUCCEEDED));
EasyMock.expect(kubernetesClient.getPeonLogs(k8sTaskId)).andReturn( EasyMock.expect(kubernetesClient.getPeonLogWatcher(k8sTaskId)).andReturn(Optional.of(logWatch));
Optional.of(IOUtils.toInputStream("", StandardCharsets.UTF_8))
);
EasyMock.expect(taskLogs.streamTaskStatus(ID)).andReturn( EasyMock.expect(taskLogs.streamTaskStatus(ID)).andReturn(
Optional.of(IOUtils.toInputStream(mapper.writeValueAsString(SUCCESS), StandardCharsets.UTF_8)) Optional.of(IOUtils.toInputStream(mapper.writeValueAsString(SUCCESS), StandardCharsets.UTF_8))
); );
taskLogs.pushTaskLog(EasyMock.eq(ID), EasyMock.anyObject(File.class)); taskLogs.pushTaskLog(EasyMock.eq(ID), EasyMock.anyObject(File.class));
EasyMock.expectLastCall().andThrow(new IOException()); EasyMock.expectLastCall().andThrow(new IOException());
logWatch.close();
EasyMock.expectLastCall();
EasyMock.expect(kubernetesClient.deletePeonJob(k8sTaskId)).andReturn(true); EasyMock.expect(kubernetesClient.deletePeonJob(k8sTaskId)).andReturn(true);
Assert.assertEquals(KubernetesPeonLifecycle.State.NOT_STARTED, peonLifecycle.getState()); Assert.assertEquals(KubernetesPeonLifecycle.State.NOT_STARTED, peonLifecycle.getState());
@ -417,6 +425,38 @@ public class KubernetesPeonLifecycleTest extends EasyMockSupport
Assert.assertEquals(KubernetesPeonLifecycle.State.STOPPED, peonLifecycle.getState()); Assert.assertEquals(KubernetesPeonLifecycle.State.STOPPED, peonLifecycle.getState());
} }
@Test
public void test_join_whenRuntimeExceptionThrownWhileWaitingForKubernetesJob_throwsException() throws IOException
{
KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle(task, kubernetesClient, taskLogs, mapper);
EasyMock.expect(kubernetesClient.waitForPeonJobCompletion(
EasyMock.eq(k8sTaskId),
EasyMock.anyLong(),
EasyMock.eq(TimeUnit.MILLISECONDS)
)).andThrow(new RuntimeException());
// We should still try to push logs
EasyMock.expect(kubernetesClient.getPeonLogWatcher(k8sTaskId)).andReturn(Optional.of(logWatch));
taskLogs.pushTaskLog(EasyMock.eq(ID), EasyMock.anyObject(File.class));
EasyMock.expectLastCall();
logWatch.close();
EasyMock.expectLastCall();
EasyMock.expect(kubernetesClient.deletePeonJob(k8sTaskId)).andReturn(true);
Assert.assertEquals(KubernetesPeonLifecycle.State.NOT_STARTED, peonLifecycle.getState());
replayAll();
Assert.assertThrows(RuntimeException.class, () -> peonLifecycle.join(0L));
verifyAll();
Assert.assertEquals(KubernetesPeonLifecycle.State.STOPPED, peonLifecycle.getState());
}
@Test @Test
public void test_shutdown_withNotStartedTaskState() public void test_shutdown_withNotStartedTaskState()
{ {

View File

@ -24,6 +24,7 @@ import org.apache.druid.indexing.common.TestUtils;
import org.apache.druid.indexing.common.config.TaskConfig; import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.common.config.TaskConfigBuilder; import org.apache.druid.indexing.common.config.TaskConfigBuilder;
import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.IAE;
import org.apache.druid.k8s.overlord.common.DruidKubernetesClient;
import org.apache.druid.k8s.overlord.taskadapter.MultiContainerTaskAdapter; import org.apache.druid.k8s.overlord.taskadapter.MultiContainerTaskAdapter;
import org.apache.druid.k8s.overlord.taskadapter.PodTemplateTaskAdapter; import org.apache.druid.k8s.overlord.taskadapter.PodTemplateTaskAdapter;
import org.apache.druid.k8s.overlord.taskadapter.SingleContainerTaskAdapter; import org.apache.druid.k8s.overlord.taskadapter.SingleContainerTaskAdapter;
@ -48,6 +49,8 @@ public class KubernetesTaskRunnerFactoryTest
private TaskConfig taskConfig; private TaskConfig taskConfig;
private Properties properties; private Properties properties;
private DruidKubernetesClient druidKubernetesClient;
@Before @Before
public void setup() public void setup()
{ {
@ -68,6 +71,7 @@ public class KubernetesTaskRunnerFactoryTest
); );
taskConfig = new TaskConfigBuilder().setBaseDir("/tmp").build(); taskConfig = new TaskConfigBuilder().setBaseDir("/tmp").build();
properties = new Properties(); properties = new Properties();
druidKubernetesClient = new DruidKubernetesClient();
} }
@Test @Test
@ -81,7 +85,8 @@ public class KubernetesTaskRunnerFactoryTest
taskLogs, taskLogs,
druidNode, druidNode,
taskConfig, taskConfig,
properties properties,
druidKubernetesClient
); );
KubernetesTaskRunner expectedRunner = factory.build(); KubernetesTaskRunner expectedRunner = factory.build();
@ -101,7 +106,8 @@ public class KubernetesTaskRunnerFactoryTest
taskLogs, taskLogs,
druidNode, druidNode,
taskConfig, taskConfig,
properties properties,
druidKubernetesClient
); );
KubernetesTaskRunner runner = factory.build(); KubernetesTaskRunner runner = factory.build();
@ -126,7 +132,8 @@ public class KubernetesTaskRunnerFactoryTest
taskLogs, taskLogs,
druidNode, druidNode,
taskConfig, taskConfig,
properties properties,
druidKubernetesClient
); );
KubernetesTaskRunner runner = factory.build(); KubernetesTaskRunner runner = factory.build();
@ -149,7 +156,8 @@ public class KubernetesTaskRunnerFactoryTest
taskLogs, taskLogs,
druidNode, druidNode,
taskConfig, taskConfig,
props props,
druidKubernetesClient
); );
KubernetesTaskRunner runner = factory.build(); KubernetesTaskRunner runner = factory.build();
@ -177,7 +185,8 @@ public class KubernetesTaskRunnerFactoryTest
taskLogs, taskLogs,
druidNode, druidNode,
taskConfig, taskConfig,
props props,
druidKubernetesClient
); );
Assert.assertThrows( Assert.assertThrows(
@ -206,7 +215,8 @@ public class KubernetesTaskRunnerFactoryTest
taskLogs, taskLogs,
druidNode, druidNode,
taskConfig, taskConfig,
props props,
druidKubernetesClient
); );
KubernetesTaskRunner runner = factory.build(); KubernetesTaskRunner runner = factory.build();
@ -229,7 +239,8 @@ public class KubernetesTaskRunnerFactoryTest
taskLogs, taskLogs,
druidNode, druidNode,
taskConfig, taskConfig,
props props,
druidKubernetesClient
); );
KubernetesTaskRunner runner = factory.build(); KubernetesTaskRunner runner = factory.build();
@ -255,7 +266,8 @@ public class KubernetesTaskRunnerFactoryTest
taskLogs, taskLogs,
druidNode, druidNode,
taskConfig, taskConfig,
props props,
druidKubernetesClient
); );
KubernetesTaskRunner runner = factory.build(); KubernetesTaskRunner runner = factory.build();

View File

@ -23,15 +23,24 @@ import org.apache.druid.indexer.RunnerTaskState;
import org.apache.druid.indexer.TaskLocation; import org.apache.druid.indexer.TaskLocation;
import org.apache.druid.indexing.common.task.NoopTask; import org.apache.druid.indexing.common.task.NoopTask;
import org.apache.druid.indexing.common.task.Task; import org.apache.druid.indexing.common.task.Task;
import org.easymock.EasyMock;
import org.easymock.EasyMockRunner;
import org.easymock.EasyMockSupport;
import org.easymock.Mock;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith;
public class KubernetesWorkItemTest @RunWith(EasyMockRunner.class)
public class KubernetesWorkItemTest extends EasyMockSupport
{ {
private KubernetesWorkItem workItem; private KubernetesWorkItem workItem;
private Task task; private Task task;
@Mock
KubernetesPeonLifecycle kubernetesPeonLifecycle;
@Before @Before
public void setup() public void setup()
{ {
@ -70,21 +79,16 @@ public class KubernetesWorkItemTest
@Test @Test
public void test_shutdown_withKubernetesPeonLifecycle() public void test_shutdown_withKubernetesPeonLifecycle()
{ {
KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle( kubernetesPeonLifecycle.shutdown();
task, EasyMock.expectLastCall();
null, kubernetesPeonLifecycle.startWatchingLogs();
null, EasyMock.expectLastCall();
null
) {
@Override
protected synchronized void shutdown()
{
}
};
workItem.setKubernetesPeonLifecycle(peonLifecycle); replayAll();
workItem.setKubernetesPeonLifecycle(kubernetesPeonLifecycle);
workItem.shutdown(); workItem.shutdown();
verifyAll();
Assert.assertTrue(workItem.isShutdownRequested()); Assert.assertTrue(workItem.isShutdownRequested());
} }

View File

@ -27,6 +27,7 @@ import io.fabric8.kubernetes.api.model.batch.v1.Job;
import io.fabric8.kubernetes.api.model.batch.v1.JobBuilder; import io.fabric8.kubernetes.api.model.batch.v1.JobBuilder;
import io.fabric8.kubernetes.client.KubernetesClient; import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.KubernetesClientTimeoutException; import io.fabric8.kubernetes.client.KubernetesClientTimeoutException;
import io.fabric8.kubernetes.client.dsl.LogWatch;
import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient; import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
import io.fabric8.kubernetes.client.server.mock.KubernetesMockServer; import io.fabric8.kubernetes.client.server.mock.KubernetesMockServer;
import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.StringUtils;
@ -494,4 +495,78 @@ public class KubernetesPeonClientTest
StringUtils.format("K8s pod with label: job-name=%s not found", ID) StringUtils.format("K8s pod with label: job-name=%s not found", ID)
); );
} }
@Test
void test_getPeonLogsWatcher_withJob_returnsWatchLogInOptional()
{
server.expect().get()
.withPath("/apis/batch/v1/namespaces/namespace/jobs/id")
.andReturn(HttpURLConnection.HTTP_OK, new JobBuilder()
.withNewMetadata()
.withName(JOB_NAME)
.withUid("uid")
.endMetadata()
.withNewSpec()
.withNewTemplate()
.withNewSpec()
.addNewContainer()
.withName("main")
.endContainer()
.endSpec()
.endTemplate()
.endSpec()
.build()
).once();
server.expect().get()
.withPath("/api/v1/namespaces/namespace/pods?labelSelector=controller-uid%3Duid")
.andReturn(HttpURLConnection.HTTP_OK, new PodListBuilder()
.addNewItem()
.withNewMetadata()
.withName(POD_NAME)
.addNewOwnerReference()
.withUid("uid")
.withController(true)
.endOwnerReference()
.endMetadata()
.withNewSpec()
.addNewContainer()
.withName("main")
.endContainer()
.endSpec()
.endItem()
.build()
).once();
server.expect().get()
.withPath("/api/v1/namespaces/namespace/pods/id/log?pretty=false&container=main")
.andReturn(HttpURLConnection.HTTP_OK, "data")
.once();
Optional<LogWatch> maybeLogWatch = instance.getPeonLogWatcher(new K8sTaskId(ID));
Assertions.assertTrue(maybeLogWatch.isPresent());
}
@Test
void test_getPeonLogsWatcher_withoutJob_returnsEmptyOptional()
{
Optional<LogWatch> maybeLogWatch = instance.getPeonLogWatcher(new K8sTaskId(ID));
Assertions.assertFalse(maybeLogWatch.isPresent());
}
@Test
void test_getPeonLogWatcher_withJobWithoutPod_returnsEmptyOptional()
{
Job job = new JobBuilder()
.withNewMetadata()
.withName(JOB_NAME)
.endMetadata()
.build();
client.batch().v1().jobs().inNamespace(NAMESPACE).resource(job).create();
Optional<LogWatch> maybeLogWatch = instance.getPeonLogWatcher(new K8sTaskId(ID));
Assertions.assertFalse(maybeLogWatch.isPresent());
}
} }

View File

@ -1,54 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.k8s.overlord.common;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.dsl.LogWatch;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.io.InputStream;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
class LogWatchInputStreamTest
{
@Test
void testFlow() throws IOException
{
LogWatch logWatch = mock(LogWatch.class);
InputStream inputStream = mock(InputStream.class);
when(inputStream.read()).thenReturn(1);
when(logWatch.getOutput()).thenReturn(inputStream);
KubernetesClient client = mock(KubernetesClient.class);
LogWatchInputStream stream = new LogWatchInputStream(client, logWatch);
int result = stream.read();
Assertions.assertEquals(1, result);
verify(inputStream, times(1)).read();
stream.close();
verify(logWatch, times(1)).close();
verify(client, times(1)).close();
}
}