mirror of https://github.com/apache/druid.git
Fix bug with cancelling pending tasks when running kubernetes ingestion. (#16036)
* Fix bug * Add new test
This commit is contained in:
parent
67ae0ff450
commit
80cab51d50
|
@ -263,11 +263,14 @@ public class KubernetesTaskRunner implements TaskLogStreamer, TaskRunner
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
workItem.shutdown();
|
||||||
|
if (!workItem.getResult().isDone()) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
synchronized (tasks) {
|
synchronized (tasks) {
|
||||||
tasks.remove(taskid);
|
tasks.remove(taskid);
|
||||||
}
|
}
|
||||||
|
|
||||||
workItem.shutdown();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -61,7 +61,7 @@ public class KubernetesPeonClient
|
||||||
this.emitter = emitter;
|
this.emitter = emitter;
|
||||||
}
|
}
|
||||||
|
|
||||||
public Pod launchPeonJobAndWaitForStart(Job job, Task task, long howLong, TimeUnit timeUnit)
|
public Pod launchPeonJobAndWaitForStart(Job job, Task task, long howLong, TimeUnit timeUnit) throws IllegalStateException
|
||||||
{
|
{
|
||||||
long start = System.currentTimeMillis();
|
long start = System.currentTimeMillis();
|
||||||
// launch job
|
// launch job
|
||||||
|
@ -74,12 +74,15 @@ public class KubernetesPeonClient
|
||||||
Pod result = client.pods().inNamespace(namespace).withName(mainPod.getMetadata().getName())
|
Pod result = client.pods().inNamespace(namespace).withName(mainPod.getMetadata().getName())
|
||||||
.waitUntilCondition(pod -> {
|
.waitUntilCondition(pod -> {
|
||||||
if (pod == null) {
|
if (pod == null) {
|
||||||
return false;
|
return true;
|
||||||
}
|
}
|
||||||
return pod.getStatus() != null && pod.getStatus().getPodIP() != null;
|
return pod.getStatus() != null && pod.getStatus().getPodIP() != null;
|
||||||
}, howLong, timeUnit);
|
}, howLong, timeUnit);
|
||||||
|
|
||||||
|
if (result == null) {
|
||||||
|
throw new IllegalStateException("K8s pod for the task [%s] appeared and disappeared. It can happen if the task was canceled");
|
||||||
|
}
|
||||||
long duration = System.currentTimeMillis() - start;
|
long duration = System.currentTimeMillis() - start;
|
||||||
log.info("Took task %s %d ms for pod to startup", jobName, duration);
|
|
||||||
emitK8sPodMetrics(task, "k8s/peon/startup/time", duration);
|
emitK8sPodMetrics(task, "k8s/peon/startup/time", duration);
|
||||||
return result;
|
return result;
|
||||||
});
|
});
|
||||||
|
|
|
@ -75,6 +75,7 @@ public class KubernetesTaskRunnerTest extends EasyMockSupport
|
||||||
@Mock private KubernetesPeonClient peonClient;
|
@Mock private KubernetesPeonClient peonClient;
|
||||||
@Mock private KubernetesPeonLifecycle kubernetesPeonLifecycle;
|
@Mock private KubernetesPeonLifecycle kubernetesPeonLifecycle;
|
||||||
@Mock private ServiceEmitter emitter;
|
@Mock private ServiceEmitter emitter;
|
||||||
|
@Mock private ListenableFuture<TaskStatus> statusFuture;
|
||||||
|
|
||||||
private KubernetesTaskRunnerConfig config;
|
private KubernetesTaskRunnerConfig config;
|
||||||
private KubernetesTaskRunner runner;
|
private KubernetesTaskRunner runner;
|
||||||
|
@ -330,16 +331,35 @@ public class KubernetesTaskRunnerTest extends EasyMockSupport
|
||||||
@Test
|
@Test
|
||||||
public void test_shutdown_withExistingTask_removesTaskFromMap()
|
public void test_shutdown_withExistingTask_removesTaskFromMap()
|
||||||
{
|
{
|
||||||
KubernetesWorkItem workItem = new KubernetesWorkItem(task, null) {
|
KubernetesWorkItem workItem = new KubernetesWorkItem(task, statusFuture) {
|
||||||
@Override
|
@Override
|
||||||
protected synchronized void shutdown()
|
protected synchronized void shutdown()
|
||||||
{
|
{
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
EasyMock.expect(statusFuture.isDone()).andReturn(true).anyTimes();
|
||||||
|
replayAll();
|
||||||
runner.tasks.put(task.getId(), workItem);
|
runner.tasks.put(task.getId(), workItem);
|
||||||
runner.shutdown(task.getId(), "");
|
runner.shutdown(task.getId(), "");
|
||||||
Assert.assertTrue(runner.tasks.isEmpty());
|
Assert.assertTrue(runner.tasks.isEmpty());
|
||||||
|
verifyAll();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void test_shutdown_withExistingTask_futureIncomplete_removesTaskFromMap()
|
||||||
|
{
|
||||||
|
KubernetesWorkItem workItem = new KubernetesWorkItem(task, statusFuture) {
|
||||||
|
@Override
|
||||||
|
protected synchronized void shutdown()
|
||||||
|
{
|
||||||
|
}
|
||||||
|
};
|
||||||
|
EasyMock.expect(statusFuture.isDone()).andReturn(false).anyTimes();
|
||||||
|
replayAll();
|
||||||
|
runner.tasks.put(task.getId(), workItem);
|
||||||
|
runner.shutdown(task.getId(), "");
|
||||||
|
Assert.assertEquals(1, runner.tasks.size());
|
||||||
|
verifyAll();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
|
|
@ -94,7 +94,7 @@ public class KubernetesPeonClientTest
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
void test_launchPeonJobAndWaitForStart_withDisappearingPod_throwsKubernetesClientTimeoutException()
|
void test_launchPeonJobAndWaitForStart_withDisappearingPod_throwIllegalStateExceptionn()
|
||||||
{
|
{
|
||||||
Job job = new JobBuilder()
|
Job job = new JobBuilder()
|
||||||
.withNewMetadata()
|
.withNewMetadata()
|
||||||
|
@ -114,6 +114,33 @@ public class KubernetesPeonClientTest
|
||||||
.build()
|
.build()
|
||||||
).once();
|
).once();
|
||||||
|
|
||||||
|
Assertions.assertThrows(
|
||||||
|
IllegalStateException.class,
|
||||||
|
() -> instance.launchPeonJobAndWaitForStart(job, NoopTask.create(), 1, TimeUnit.SECONDS)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void test_launchPeonJobAndWaitForStart_withPendingPod_throwIllegalStateExceptionn()
|
||||||
|
{
|
||||||
|
Job job = new JobBuilder()
|
||||||
|
.withNewMetadata()
|
||||||
|
.withName(KUBERNETES_JOB_NAME)
|
||||||
|
.endMetadata()
|
||||||
|
.build();
|
||||||
|
|
||||||
|
Pod pod = new PodBuilder()
|
||||||
|
.withNewMetadata()
|
||||||
|
.withName(POD_NAME)
|
||||||
|
.addToLabels("job-name", KUBERNETES_JOB_NAME)
|
||||||
|
.endMetadata()
|
||||||
|
.withNewStatus()
|
||||||
|
.withPodIP(null)
|
||||||
|
.endStatus()
|
||||||
|
.build();
|
||||||
|
|
||||||
|
client.pods().inNamespace(NAMESPACE).resource(pod).create();
|
||||||
|
|
||||||
Assertions.assertThrows(
|
Assertions.assertThrows(
|
||||||
KubernetesClientTimeoutException.class,
|
KubernetesClientTimeoutException.class,
|
||||||
() -> instance.launchPeonJobAndWaitForStart(job, NoopTask.create(), 1, TimeUnit.SECONDS)
|
() -> instance.launchPeonJobAndWaitForStart(job, NoopTask.create(), 1, TimeUnit.SECONDS)
|
||||||
|
|
Loading…
Reference in New Issue