mirror of https://github.com/apache/druid.git
Fix issue with long data source names (#14620)
* Fix issue with long data source names * Use the regular library * fix overlord utils test
This commit is contained in:
parent
607f511767
commit
28914bbab8
|
@ -226,7 +226,7 @@ public class KubernetesPeonLifecycle
|
||||||
return TaskLocation.unknown();
|
return TaskLocation.unknown();
|
||||||
}
|
}
|
||||||
|
|
||||||
Optional<Pod> maybePod = kubernetesClient.getPeonPod(taskId);
|
Optional<Pod> maybePod = kubernetesClient.getPeonPod(taskId.getK8sJobName());
|
||||||
if (!maybePod.isPresent()) {
|
if (!maybePod.isPresent()) {
|
||||||
return TaskLocation.unknown();
|
return TaskLocation.unknown();
|
||||||
}
|
}
|
||||||
|
|
|
@ -26,7 +26,7 @@ import java.util.Objects;
|
||||||
public class K8sTaskId
|
public class K8sTaskId
|
||||||
{
|
{
|
||||||
|
|
||||||
private final String k8sTaskId;
|
private final String k8sJobName;
|
||||||
private final String originalTaskId;
|
private final String originalTaskId;
|
||||||
|
|
||||||
public K8sTaskId(Task task)
|
public K8sTaskId(Task task)
|
||||||
|
@ -37,12 +37,12 @@ public class K8sTaskId
|
||||||
public K8sTaskId(String taskId)
|
public K8sTaskId(String taskId)
|
||||||
{
|
{
|
||||||
this.originalTaskId = taskId;
|
this.originalTaskId = taskId;
|
||||||
this.k8sTaskId = KubernetesOverlordUtils.convertTaskIdToK8sLabel(taskId);
|
this.k8sJobName = KubernetesOverlordUtils.convertTaskIdToJobName(taskId);
|
||||||
}
|
}
|
||||||
|
|
||||||
public String getK8sTaskId()
|
public String getK8sJobName()
|
||||||
{
|
{
|
||||||
return k8sTaskId;
|
return k8sJobName;
|
||||||
}
|
}
|
||||||
|
|
||||||
public String getOriginalTaskId()
|
public String getOriginalTaskId()
|
||||||
|
@ -60,18 +60,18 @@ public class K8sTaskId
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
K8sTaskId k8sTaskId1 = (K8sTaskId) o;
|
K8sTaskId k8sTaskId1 = (K8sTaskId) o;
|
||||||
return k8sTaskId.equals(k8sTaskId1.k8sTaskId) && originalTaskId.equals(k8sTaskId1.originalTaskId);
|
return k8sJobName.equals(k8sTaskId1.k8sJobName) && originalTaskId.equals(k8sTaskId1.originalTaskId);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public int hashCode()
|
public int hashCode()
|
||||||
{
|
{
|
||||||
return Objects.hash(k8sTaskId, originalTaskId);
|
return Objects.hash(k8sJobName, originalTaskId);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String toString()
|
public String toString()
|
||||||
{
|
{
|
||||||
return "[ " + originalTaskId + ", " + k8sTaskId + "]";
|
return "[ " + originalTaskId + ", " + k8sJobName + "]";
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -19,9 +19,11 @@
|
||||||
|
|
||||||
package org.apache.druid.k8s.overlord.common;
|
package org.apache.druid.k8s.overlord.common;
|
||||||
|
|
||||||
|
import com.google.common.hash.Hashing;
|
||||||
import org.apache.commons.lang3.RegExUtils;
|
import org.apache.commons.lang3.RegExUtils;
|
||||||
import org.apache.commons.lang3.StringUtils;
|
import org.apache.commons.lang3.StringUtils;
|
||||||
|
|
||||||
|
import java.nio.charset.StandardCharsets;
|
||||||
import java.util.Locale;
|
import java.util.Locale;
|
||||||
import java.util.regex.Pattern;
|
import java.util.regex.Pattern;
|
||||||
|
|
||||||
|
@ -43,4 +45,10 @@ public class KubernetesOverlordUtils
|
||||||
return taskId == null ? "" : StringUtils.left(RegExUtils.replaceAll(taskId, K8S_TASK_ID_PATTERN, "")
|
return taskId == null ? "" : StringUtils.left(RegExUtils.replaceAll(taskId, K8S_TASK_ID_PATTERN, "")
|
||||||
.toLowerCase(Locale.ENGLISH), 63);
|
.toLowerCase(Locale.ENGLISH), 63);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static String convertTaskIdToJobName(String taskId)
|
||||||
|
{
|
||||||
|
return taskId == null ? "" : StringUtils.left(RegExUtils.replaceAll(taskId, K8S_TASK_ID_PATTERN, "")
|
||||||
|
.toLowerCase(Locale.ENGLISH), 30) + "-" + Hashing.murmur3_128().hashString(taskId, StandardCharsets.UTF_8);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -56,10 +56,10 @@ public class KubernetesPeonClient
|
||||||
// launch job
|
// launch job
|
||||||
return clientApi.executeRequest(client -> {
|
return clientApi.executeRequest(client -> {
|
||||||
client.batch().v1().jobs().inNamespace(namespace).resource(job).create();
|
client.batch().v1().jobs().inNamespace(namespace).resource(job).create();
|
||||||
K8sTaskId taskId = new K8sTaskId(job.getMetadata().getName());
|
String jobName = job.getMetadata().getName();
|
||||||
log.info("Successfully submitted job: %s ... waiting for job to launch", taskId);
|
log.info("Successfully submitted job: %s ... waiting for job to launch", jobName);
|
||||||
// wait until the pod is running or complete or failed, any of those is fine
|
// wait until the pod is running or complete or failed, any of those is fine
|
||||||
Pod mainPod = getPeonPodWithRetries(taskId);
|
Pod mainPod = getPeonPodWithRetries(jobName);
|
||||||
Pod result = client.pods().inNamespace(namespace).withName(mainPod.getMetadata().getName())
|
Pod result = client.pods().inNamespace(namespace).withName(mainPod.getMetadata().getName())
|
||||||
.waitUntilCondition(pod -> {
|
.waitUntilCondition(pod -> {
|
||||||
if (pod == null) {
|
if (pod == null) {
|
||||||
|
@ -68,7 +68,7 @@ public class KubernetesPeonClient
|
||||||
return pod.getStatus() != null && pod.getStatus().getPodIP() != null;
|
return pod.getStatus() != null && pod.getStatus().getPodIP() != null;
|
||||||
}, howLong, timeUnit);
|
}, howLong, timeUnit);
|
||||||
long duration = System.currentTimeMillis() - start;
|
long duration = System.currentTimeMillis() - start;
|
||||||
log.info("Took task %s %d ms for pod to startup", taskId, duration);
|
log.info("Took task %s %d ms for pod to startup", jobName, duration);
|
||||||
return result;
|
return result;
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
@ -80,7 +80,7 @@ public class KubernetesPeonClient
|
||||||
.v1()
|
.v1()
|
||||||
.jobs()
|
.jobs()
|
||||||
.inNamespace(namespace)
|
.inNamespace(namespace)
|
||||||
.withName(taskId.getK8sTaskId())
|
.withName(taskId.getK8sJobName())
|
||||||
.waitUntilCondition(
|
.waitUntilCondition(
|
||||||
x -> (x == null) || (x.getStatus() != null && x.getStatus().getActive() == null
|
x -> (x == null) || (x.getStatus() != null && x.getStatus().getActive() == null
|
||||||
&& (x.getStatus().getFailed() != null || x.getStatus().getSucceeded() != null)),
|
&& (x.getStatus().getFailed() != null || x.getStatus().getSucceeded() != null)),
|
||||||
|
@ -106,7 +106,7 @@ public class KubernetesPeonClient
|
||||||
.v1()
|
.v1()
|
||||||
.jobs()
|
.jobs()
|
||||||
.inNamespace(namespace)
|
.inNamespace(namespace)
|
||||||
.withName(taskId.getK8sTaskId())
|
.withName(taskId.getK8sJobName())
|
||||||
.delete().isEmpty());
|
.delete().isEmpty());
|
||||||
if (result) {
|
if (result) {
|
||||||
log.info("Cleaned up k8s task: %s", taskId);
|
log.info("Cleaned up k8s task: %s", taskId);
|
||||||
|
@ -128,7 +128,7 @@ public class KubernetesPeonClient
|
||||||
.v1()
|
.v1()
|
||||||
.jobs()
|
.jobs()
|
||||||
.inNamespace(namespace)
|
.inNamespace(namespace)
|
||||||
.withName(taskId.getK8sTaskId())
|
.withName(taskId.getK8sJobName())
|
||||||
.inContainer("main")
|
.inContainer("main")
|
||||||
.watchLog();
|
.watchLog();
|
||||||
if (logWatch == null) {
|
if (logWatch == null) {
|
||||||
|
@ -150,7 +150,7 @@ public class KubernetesPeonClient
|
||||||
.v1()
|
.v1()
|
||||||
.jobs()
|
.jobs()
|
||||||
.inNamespace(namespace)
|
.inNamespace(namespace)
|
||||||
.withName(taskId.getK8sTaskId())
|
.withName(taskId.getK8sJobName())
|
||||||
.inContainer("main")
|
.inContainer("main")
|
||||||
.getLogInputStream();
|
.getLogInputStream();
|
||||||
if (logStream == null) {
|
if (logStream == null) {
|
||||||
|
@ -212,47 +212,46 @@ public class KubernetesPeonClient
|
||||||
return toDelete;
|
return toDelete;
|
||||||
}
|
}
|
||||||
|
|
||||||
public Optional<Pod> getPeonPod(K8sTaskId taskId)
|
public Optional<Pod> getPeonPod(String jobName)
|
||||||
{
|
{
|
||||||
return clientApi.executeRequest(client -> getPeonPod(client, taskId));
|
return clientApi.executeRequest(client -> getPeonPod(client, jobName));
|
||||||
}
|
}
|
||||||
|
|
||||||
private Optional<Pod> getPeonPod(KubernetesClient client, K8sTaskId taskId)
|
private Optional<Pod> getPeonPod(KubernetesClient client, String jobName)
|
||||||
{
|
{
|
||||||
List<Pod> pods = client.pods()
|
List<Pod> pods = client.pods()
|
||||||
.inNamespace(namespace)
|
.inNamespace(namespace)
|
||||||
.withLabel("job-name", taskId.getK8sTaskId())
|
.withLabel("job-name", jobName)
|
||||||
.list()
|
.list()
|
||||||
.getItems();
|
.getItems();
|
||||||
return pods.isEmpty() ? Optional.absent() : Optional.of(pods.get(0));
|
return pods.isEmpty() ? Optional.absent() : Optional.of(pods.get(0));
|
||||||
}
|
}
|
||||||
|
|
||||||
public Pod getPeonPodWithRetries(K8sTaskId taskId)
|
public Pod getPeonPodWithRetries(String jobName)
|
||||||
{
|
{
|
||||||
return clientApi.executeRequest(client -> getPeonPodWithRetries(client, taskId, 5, RetryUtils.DEFAULT_MAX_TRIES));
|
return clientApi.executeRequest(client -> getPeonPodWithRetries(client, jobName, 5, RetryUtils.DEFAULT_MAX_TRIES));
|
||||||
}
|
}
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
Pod getPeonPodWithRetries(KubernetesClient client, K8sTaskId taskId, int quietTries, int maxTries)
|
Pod getPeonPodWithRetries(KubernetesClient client, String jobName, int quietTries, int maxTries)
|
||||||
{
|
{
|
||||||
String k8sTaskId = taskId.getK8sTaskId();
|
|
||||||
try {
|
try {
|
||||||
return RetryUtils.retry(
|
return RetryUtils.retry(
|
||||||
() -> {
|
() -> {
|
||||||
Optional<Pod> maybePod = getPeonPod(client, taskId);
|
Optional<Pod> maybePod = getPeonPod(client, jobName);
|
||||||
if (maybePod.isPresent()) {
|
if (maybePod.isPresent()) {
|
||||||
return maybePod.get();
|
return maybePod.get();
|
||||||
}
|
}
|
||||||
throw new KubernetesResourceNotFoundException(
|
throw new KubernetesResourceNotFoundException(
|
||||||
"K8s pod with label: job-name="
|
"K8s pod with label: job-name="
|
||||||
+ k8sTaskId
|
+ jobName
|
||||||
+ " not found");
|
+ " not found");
|
||||||
},
|
},
|
||||||
DruidK8sConstants.IS_TRANSIENT, quietTries, maxTries
|
DruidK8sConstants.IS_TRANSIENT, quietTries, maxTries
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
catch (Exception e) {
|
catch (Exception e) {
|
||||||
throw new KubernetesResourceNotFoundException("K8s pod with label: job-name=" + k8sTaskId + " not found");
|
throw new KubernetesResourceNotFoundException("K8s pod with label: job-name=" + jobName + " not found");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -149,7 +149,7 @@ public abstract class K8sTaskAdapter implements TaskAdapter
|
||||||
{
|
{
|
||||||
return new JobBuilder()
|
return new JobBuilder()
|
||||||
.withNewMetadata()
|
.withNewMetadata()
|
||||||
.withName(k8sTaskId.getK8sTaskId())
|
.withName(k8sTaskId.getK8sJobName())
|
||||||
.addToLabels(labels)
|
.addToLabels(labels)
|
||||||
.addToAnnotations(annotations)
|
.addToAnnotations(annotations)
|
||||||
.endMetadata()
|
.endMetadata()
|
||||||
|
@ -309,7 +309,7 @@ public abstract class K8sTaskAdapter implements TaskAdapter
|
||||||
// clean up the podSpec
|
// clean up the podSpec
|
||||||
podSpec.setNodeName(null);
|
podSpec.setNodeName(null);
|
||||||
podSpec.setRestartPolicy("Never");
|
podSpec.setRestartPolicy("Never");
|
||||||
podSpec.setHostname(k8sTaskId.getK8sTaskId());
|
podSpec.setHostname(k8sTaskId.getK8sJobName());
|
||||||
podSpec.setTerminationGracePeriodSeconds(taskRunnerConfig.getGraceTerminationPeriodSeconds());
|
podSpec.setTerminationGracePeriodSeconds(taskRunnerConfig.getGraceTerminationPeriodSeconds());
|
||||||
|
|
||||||
PodTemplateSpec podTemplate = new PodTemplateSpec();
|
PodTemplateSpec podTemplate = new PodTemplateSpec();
|
||||||
|
|
|
@ -124,7 +124,7 @@ public class PodTemplateTaskAdapter implements TaskAdapter
|
||||||
|
|
||||||
return new JobBuilder()
|
return new JobBuilder()
|
||||||
.withNewMetadata()
|
.withNewMetadata()
|
||||||
.withName(new K8sTaskId(task).getK8sTaskId())
|
.withName(new K8sTaskId(task).getK8sJobName())
|
||||||
.addToLabels(getJobLabels(taskRunnerConfig, task))
|
.addToLabels(getJobLabels(taskRunnerConfig, task))
|
||||||
.addToAnnotations(getJobAnnotations(taskRunnerConfig, task))
|
.addToAnnotations(getJobAnnotations(taskRunnerConfig, task))
|
||||||
.endMetadata()
|
.endMetadata()
|
||||||
|
|
|
@ -576,7 +576,7 @@ public class KubernetesPeonLifecycleTest extends EasyMockSupport
|
||||||
KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle(task, kubernetesClient, taskLogs, mapper);
|
KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle(task, kubernetesClient, taskLogs, mapper);
|
||||||
setPeonLifecycleState(peonLifecycle, KubernetesPeonLifecycle.State.RUNNING);
|
setPeonLifecycleState(peonLifecycle, KubernetesPeonLifecycle.State.RUNNING);
|
||||||
|
|
||||||
EasyMock.expect(kubernetesClient.getPeonPod(k8sTaskId)).andReturn(Optional.absent());
|
EasyMock.expect(kubernetesClient.getPeonPod(k8sTaskId.getK8sJobName())).andReturn(Optional.absent());
|
||||||
|
|
||||||
replayAll();
|
replayAll();
|
||||||
|
|
||||||
|
@ -598,7 +598,7 @@ public class KubernetesPeonLifecycleTest extends EasyMockSupport
|
||||||
.endMetadata()
|
.endMetadata()
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
EasyMock.expect(kubernetesClient.getPeonPod(k8sTaskId)).andReturn(Optional.of(pod));
|
EasyMock.expect(kubernetesClient.getPeonPod(k8sTaskId.getK8sJobName())).andReturn(Optional.of(pod));
|
||||||
|
|
||||||
replayAll();
|
replayAll();
|
||||||
|
|
||||||
|
@ -623,7 +623,7 @@ public class KubernetesPeonLifecycleTest extends EasyMockSupport
|
||||||
.endStatus()
|
.endStatus()
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
EasyMock.expect(kubernetesClient.getPeonPod(k8sTaskId)).andReturn(Optional.of(pod));
|
EasyMock.expect(kubernetesClient.getPeonPod(k8sTaskId.getK8sJobName())).andReturn(Optional.of(pod));
|
||||||
|
|
||||||
replayAll();
|
replayAll();
|
||||||
|
|
||||||
|
@ -653,7 +653,7 @@ public class KubernetesPeonLifecycleTest extends EasyMockSupport
|
||||||
.endStatus()
|
.endStatus()
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
EasyMock.expect(kubernetesClient.getPeonPod(k8sTaskId)).andReturn(Optional.of(pod));
|
EasyMock.expect(kubernetesClient.getPeonPod(k8sTaskId.getK8sJobName())).andReturn(Optional.of(pod));
|
||||||
|
|
||||||
replayAll();
|
replayAll();
|
||||||
|
|
||||||
|
|
|
@ -30,15 +30,15 @@ public class K8sTaskIdTest
|
||||||
public void testModifyingTaskIDToBeK8sCompliant()
|
public void testModifyingTaskIDToBeK8sCompliant()
|
||||||
{
|
{
|
||||||
String original = "coordinator-issued_compact_k8smetrics_aeifmefd_2022-08-18T15:33:26.094Z";
|
String original = "coordinator-issued_compact_k8smetrics_aeifmefd_2022-08-18T15:33:26.094Z";
|
||||||
String result = new K8sTaskId(original).getK8sTaskId();
|
String result = new K8sTaskId(original).getK8sJobName();
|
||||||
assertEquals("coordinatorissuedcompactk8smetricsaeifmefd20220818t153326094z", result);
|
assertEquals("coordinatorissuedcompactk8smet-2e2c1862cb7ad1d01f4794b27a4438b0", result);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testEquals()
|
public void testEquals()
|
||||||
{
|
{
|
||||||
EqualsVerifier.forClass(K8sTaskId.class)
|
EqualsVerifier.forClass(K8sTaskId.class)
|
||||||
.withNonnullFields("k8sTaskId", "originalTaskId")
|
.withNonnullFields("k8sJobName", "originalTaskId")
|
||||||
.usingGetClass()
|
.usingGetClass()
|
||||||
.verify();
|
.verify();
|
||||||
}
|
}
|
||||||
|
|
|
@ -59,4 +59,20 @@ public class KubernetesOverlordUtilsTest
|
||||||
{
|
{
|
||||||
Assert.assertEquals("", KubernetesOverlordUtils.convertTaskIdToK8sLabel(null));
|
Assert.assertEquals("", KubernetesOverlordUtils.convertTaskIdToK8sLabel(null));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void test_stripJobName()
|
||||||
|
{
|
||||||
|
Assert.assertEquals("apiissuedkillwikipedianewbalhn-8916017dfd5469fe9a8881b1035497a2", KubernetesOverlordUtils.convertTaskIdToJobName(
|
||||||
|
"api-issued_kill_wikipedia_new_balhnoib_1000-01-01T00:00:00.000Z_2023-05-14T00:00:00.000Z_2023-05-15T17:28:42.526Z"
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void test_stripJobName_avoidDuplicatesWithLongDataSourceName()
|
||||||
|
{
|
||||||
|
String jobName1 = KubernetesOverlordUtils.convertTaskIdToJobName("coordinator-issued_compact_1234_telemetry_wikipedia_geteditfailuresinnorthamerica_agg_summ_116_pcgkebcl_2023-07-19T16:53:11.416Z");
|
||||||
|
String jobName2 = KubernetesOverlordUtils.convertTaskIdToJobName("coordinator-issued_compact_1234_telemetry_wikipedia_geteditfailuresinnorthamerica_agg_summ_117_pcgkebcl_2023-07-19T16:53:11.416Z");
|
||||||
|
Assert.assertNotEquals(jobName1, jobName2);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -46,6 +46,7 @@ public class KubernetesPeonClientTest
|
||||||
{
|
{
|
||||||
private static final String ID = "id";
|
private static final String ID = "id";
|
||||||
private static final String JOB_NAME = ID;
|
private static final String JOB_NAME = ID;
|
||||||
|
private static final String KUBERNETES_JOB_NAME = KubernetesOverlordUtils.convertTaskIdToJobName(JOB_NAME);
|
||||||
private static final String POD_NAME = "name";
|
private static final String POD_NAME = "name";
|
||||||
private static final String NAMESPACE = "namespace";
|
private static final String NAMESPACE = "namespace";
|
||||||
|
|
||||||
|
@ -66,14 +67,14 @@ public class KubernetesPeonClientTest
|
||||||
{
|
{
|
||||||
Job job = new JobBuilder()
|
Job job = new JobBuilder()
|
||||||
.withNewMetadata()
|
.withNewMetadata()
|
||||||
.withName(JOB_NAME)
|
.withName(KUBERNETES_JOB_NAME)
|
||||||
.endMetadata()
|
.endMetadata()
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
Pod pod = new PodBuilder()
|
Pod pod = new PodBuilder()
|
||||||
.withNewMetadata()
|
.withNewMetadata()
|
||||||
.withName(POD_NAME)
|
.withName(POD_NAME)
|
||||||
.addToLabels("job-name", JOB_NAME)
|
.addToLabels("job-name", KUBERNETES_JOB_NAME)
|
||||||
.endMetadata()
|
.endMetadata()
|
||||||
.withNewStatus()
|
.withNewStatus()
|
||||||
.withPodIP("ip")
|
.withPodIP("ip")
|
||||||
|
@ -92,12 +93,12 @@ public class KubernetesPeonClientTest
|
||||||
{
|
{
|
||||||
Job job = new JobBuilder()
|
Job job = new JobBuilder()
|
||||||
.withNewMetadata()
|
.withNewMetadata()
|
||||||
.withName(JOB_NAME)
|
.withName(KUBERNETES_JOB_NAME)
|
||||||
.endMetadata()
|
.endMetadata()
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
server.expect().get()
|
server.expect().get()
|
||||||
.withPath("/api/v1/namespaces/namespace/pods?labelSelector=job-name%3Did")
|
.withPath("/api/v1/namespaces/namespace/pods?labelSelector=job-name%3D" + KUBERNETES_JOB_NAME)
|
||||||
.andReturn(HttpURLConnection.HTTP_OK, new PodListBuilder()
|
.andReturn(HttpURLConnection.HTTP_OK, new PodListBuilder()
|
||||||
.addNewItem()
|
.addNewItem()
|
||||||
.withNewMetadata()
|
.withNewMetadata()
|
||||||
|
@ -119,7 +120,7 @@ public class KubernetesPeonClientTest
|
||||||
{
|
{
|
||||||
Job job = new JobBuilder()
|
Job job = new JobBuilder()
|
||||||
.withNewMetadata()
|
.withNewMetadata()
|
||||||
.withName(JOB_NAME)
|
.withName(KUBERNETES_JOB_NAME)
|
||||||
.endMetadata()
|
.endMetadata()
|
||||||
.withNewStatus()
|
.withNewStatus()
|
||||||
.withActive(null)
|
.withActive(null)
|
||||||
|
@ -144,7 +145,7 @@ public class KubernetesPeonClientTest
|
||||||
{
|
{
|
||||||
Job job = new JobBuilder()
|
Job job = new JobBuilder()
|
||||||
.withNewMetadata()
|
.withNewMetadata()
|
||||||
.withName(JOB_NAME)
|
.withName(KUBERNETES_JOB_NAME)
|
||||||
.endMetadata()
|
.endMetadata()
|
||||||
.withNewStatus()
|
.withNewStatus()
|
||||||
.withActive(null)
|
.withActive(null)
|
||||||
|
@ -182,7 +183,7 @@ public class KubernetesPeonClientTest
|
||||||
{
|
{
|
||||||
Job job = new JobBuilder()
|
Job job = new JobBuilder()
|
||||||
.withNewMetadata()
|
.withNewMetadata()
|
||||||
.withName(JOB_NAME)
|
.withName(KUBERNETES_JOB_NAME)
|
||||||
.endMetadata()
|
.endMetadata()
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
|
@ -208,7 +209,7 @@ public class KubernetesPeonClientTest
|
||||||
|
|
||||||
Job job = new JobBuilder()
|
Job job = new JobBuilder()
|
||||||
.withNewMetadata()
|
.withNewMetadata()
|
||||||
.withName(JOB_NAME)
|
.withName(KUBERNETES_JOB_NAME)
|
||||||
.endMetadata()
|
.endMetadata()
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
|
@ -217,7 +218,7 @@ public class KubernetesPeonClientTest
|
||||||
Assertions.assertTrue(instance.deletePeonJob(new K8sTaskId(ID)));
|
Assertions.assertTrue(instance.deletePeonJob(new K8sTaskId(ID)));
|
||||||
|
|
||||||
Assertions.assertNotNull(
|
Assertions.assertNotNull(
|
||||||
client.batch().v1().jobs().inNamespace(NAMESPACE).withName(ID).get()
|
client.batch().v1().jobs().inNamespace(NAMESPACE).withName(KUBERNETES_JOB_NAME).get()
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -237,10 +238,10 @@ public class KubernetesPeonClientTest
|
||||||
void test_getPeonLogs_withJob_returnsInputStreamInOptional()
|
void test_getPeonLogs_withJob_returnsInputStreamInOptional()
|
||||||
{
|
{
|
||||||
server.expect().get()
|
server.expect().get()
|
||||||
.withPath("/apis/batch/v1/namespaces/namespace/jobs/id")
|
.withPath("/apis/batch/v1/namespaces/namespace/jobs/" + KUBERNETES_JOB_NAME)
|
||||||
.andReturn(HttpURLConnection.HTTP_OK, new JobBuilder()
|
.andReturn(HttpURLConnection.HTTP_OK, new JobBuilder()
|
||||||
.withNewMetadata()
|
.withNewMetadata()
|
||||||
.withName(JOB_NAME)
|
.withName(KUBERNETES_JOB_NAME)
|
||||||
.withUid("uid")
|
.withUid("uid")
|
||||||
.endMetadata()
|
.endMetadata()
|
||||||
.withNewSpec()
|
.withNewSpec()
|
||||||
|
@ -289,7 +290,7 @@ public class KubernetesPeonClientTest
|
||||||
{
|
{
|
||||||
Job job = new JobBuilder()
|
Job job = new JobBuilder()
|
||||||
.withNewMetadata()
|
.withNewMetadata()
|
||||||
.withName(JOB_NAME)
|
.withName(KUBERNETES_JOB_NAME)
|
||||||
.endMetadata()
|
.endMetadata()
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
|
@ -311,7 +312,7 @@ public class KubernetesPeonClientTest
|
||||||
{
|
{
|
||||||
Job job = new JobBuilder()
|
Job job = new JobBuilder()
|
||||||
.withNewMetadata()
|
.withNewMetadata()
|
||||||
.withName(JOB_NAME)
|
.withName(KUBERNETES_JOB_NAME)
|
||||||
.addToLabels("druid.k8s.peons", "true")
|
.addToLabels("druid.k8s.peons", "true")
|
||||||
.endMetadata()
|
.endMetadata()
|
||||||
.build();
|
.build();
|
||||||
|
@ -342,7 +343,7 @@ public class KubernetesPeonClientTest
|
||||||
{
|
{
|
||||||
Job job = new JobBuilder()
|
Job job = new JobBuilder()
|
||||||
.withNewMetadata()
|
.withNewMetadata()
|
||||||
.withName(JOB_NAME)
|
.withName(KUBERNETES_JOB_NAME)
|
||||||
.endMetadata()
|
.endMetadata()
|
||||||
.withNewStatus()
|
.withNewStatus()
|
||||||
.withActive(1)
|
.withActive(1)
|
||||||
|
@ -361,7 +362,7 @@ public class KubernetesPeonClientTest
|
||||||
{
|
{
|
||||||
Job job = new JobBuilder()
|
Job job = new JobBuilder()
|
||||||
.withNewMetadata()
|
.withNewMetadata()
|
||||||
.withName(JOB_NAME)
|
.withName(KUBERNETES_JOB_NAME)
|
||||||
.addToLabels("druid.k8s.peons", "true")
|
.addToLabels("druid.k8s.peons", "true")
|
||||||
.endMetadata()
|
.endMetadata()
|
||||||
.withNewStatus()
|
.withNewStatus()
|
||||||
|
@ -381,7 +382,7 @@ public class KubernetesPeonClientTest
|
||||||
{
|
{
|
||||||
Job job = new JobBuilder()
|
Job job = new JobBuilder()
|
||||||
.withNewMetadata()
|
.withNewMetadata()
|
||||||
.withName(JOB_NAME)
|
.withName(KUBERNETES_JOB_NAME)
|
||||||
.addToLabels("druid.k8s.peons", "true")
|
.addToLabels("druid.k8s.peons", "true")
|
||||||
.endMetadata()
|
.endMetadata()
|
||||||
.withNewStatus()
|
.withNewStatus()
|
||||||
|
@ -401,7 +402,7 @@ public class KubernetesPeonClientTest
|
||||||
{
|
{
|
||||||
Job activeJob = new JobBuilder()
|
Job activeJob = new JobBuilder()
|
||||||
.withNewMetadata()
|
.withNewMetadata()
|
||||||
.withName(StringUtils.format("%s-active", JOB_NAME))
|
.withName(StringUtils.format("%s-active", KUBERNETES_JOB_NAME))
|
||||||
.endMetadata()
|
.endMetadata()
|
||||||
.withNewStatus()
|
.withNewStatus()
|
||||||
.withActive(1)
|
.withActive(1)
|
||||||
|
@ -410,7 +411,7 @@ public class KubernetesPeonClientTest
|
||||||
|
|
||||||
Job deletableJob = new JobBuilder()
|
Job deletableJob = new JobBuilder()
|
||||||
.withNewMetadata()
|
.withNewMetadata()
|
||||||
.withName(StringUtils.format("%s-deleteable", JOB_NAME))
|
.withName(StringUtils.format("%s-deleteable", KUBERNETES_JOB_NAME))
|
||||||
.addToLabels("druid.k8s.peons", "true")
|
.addToLabels("druid.k8s.peons", "true")
|
||||||
.endMetadata()
|
.endMetadata()
|
||||||
.withNewStatus()
|
.withNewStatus()
|
||||||
|
@ -420,7 +421,7 @@ public class KubernetesPeonClientTest
|
||||||
|
|
||||||
Job undeletableJob = new JobBuilder()
|
Job undeletableJob = new JobBuilder()
|
||||||
.withNewMetadata()
|
.withNewMetadata()
|
||||||
.withName(StringUtils.format("%s-undeletable", JOB_NAME))
|
.withName(StringUtils.format("%s-undeletable", KUBERNETES_JOB_NAME))
|
||||||
.addToLabels("druid.k8s.peons", "true")
|
.addToLabels("druid.k8s.peons", "true")
|
||||||
.endMetadata()
|
.endMetadata()
|
||||||
.withNewStatus()
|
.withNewStatus()
|
||||||
|
@ -443,13 +444,13 @@ public class KubernetesPeonClientTest
|
||||||
Pod pod = new PodBuilder()
|
Pod pod = new PodBuilder()
|
||||||
.withNewMetadata()
|
.withNewMetadata()
|
||||||
.withName(POD_NAME)
|
.withName(POD_NAME)
|
||||||
.addToLabels("job-name", JOB_NAME)
|
.addToLabels("job-name", KUBERNETES_JOB_NAME)
|
||||||
.endMetadata()
|
.endMetadata()
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
client.pods().inNamespace(NAMESPACE).resource(pod).create();
|
client.pods().inNamespace(NAMESPACE).resource(pod).create();
|
||||||
|
|
||||||
Optional<Pod> maybePod = instance.getPeonPod(new K8sTaskId(ID));
|
Optional<Pod> maybePod = instance.getPeonPod(KUBERNETES_JOB_NAME);
|
||||||
|
|
||||||
Assertions.assertTrue(maybePod.isPresent());
|
Assertions.assertTrue(maybePod.isPresent());
|
||||||
}
|
}
|
||||||
|
@ -457,7 +458,7 @@ public class KubernetesPeonClientTest
|
||||||
@Test
|
@Test
|
||||||
void test_getPeonPod_withoutPod_returnsEmptyOptional()
|
void test_getPeonPod_withoutPod_returnsEmptyOptional()
|
||||||
{
|
{
|
||||||
Optional<Pod> maybePod = instance.getPeonPod(new K8sTaskId(ID));
|
Optional<Pod> maybePod = instance.getPeonPod(KUBERNETES_JOB_NAME);
|
||||||
Assertions.assertFalse(maybePod.isPresent());
|
Assertions.assertFalse(maybePod.isPresent());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -465,23 +466,23 @@ public class KubernetesPeonClientTest
|
||||||
void test_getPeonPodWithRetries_withPod_returnsPod()
|
void test_getPeonPodWithRetries_withPod_returnsPod()
|
||||||
{
|
{
|
||||||
server.expect().get()
|
server.expect().get()
|
||||||
.withPath("/api/v1/namespaces/namespace/pods?labelSelector=job-name%3Did")
|
.withPath("/api/v1/namespaces/namespace/pods?labelSelector=job-name%3D" + KUBERNETES_JOB_NAME)
|
||||||
.andReturn(HttpURLConnection.HTTP_OK, new PodListBuilder().build())
|
.andReturn(HttpURLConnection.HTTP_OK, new PodListBuilder().build())
|
||||||
.once();
|
.once();
|
||||||
|
|
||||||
server.expect().get()
|
server.expect().get()
|
||||||
.withPath("/api/v1/namespaces/namespace/pods?labelSelector=job-name%3Did")
|
.withPath("/api/v1/namespaces/namespace/pods?labelSelector=job-name%3D" + KUBERNETES_JOB_NAME)
|
||||||
.andReturn(HttpURLConnection.HTTP_OK, new PodListBuilder()
|
.andReturn(HttpURLConnection.HTTP_OK, new PodListBuilder()
|
||||||
.addNewItem()
|
.addNewItem()
|
||||||
.withNewMetadata()
|
.withNewMetadata()
|
||||||
.withName(POD_NAME)
|
.withName(POD_NAME)
|
||||||
.addToLabels("job-name", JOB_NAME)
|
.addToLabels("job-name", KUBERNETES_JOB_NAME)
|
||||||
.endMetadata()
|
.endMetadata()
|
||||||
.endItem()
|
.endItem()
|
||||||
.build()
|
.build()
|
||||||
).once();
|
).once();
|
||||||
|
|
||||||
Pod pod = instance.getPeonPodWithRetries(new K8sTaskId(ID));
|
Pod pod = instance.getPeonPodWithRetries(new K8sTaskId(ID).getK8sJobName());
|
||||||
|
|
||||||
Assertions.assertNotNull(pod);
|
Assertions.assertNotNull(pod);
|
||||||
}
|
}
|
||||||
|
@ -491,7 +492,7 @@ public class KubernetesPeonClientTest
|
||||||
{
|
{
|
||||||
Assertions.assertThrows(
|
Assertions.assertThrows(
|
||||||
KubernetesResourceNotFoundException.class,
|
KubernetesResourceNotFoundException.class,
|
||||||
() -> instance.getPeonPodWithRetries(clientApi.getClient(), new K8sTaskId(ID), 1, 1),
|
() -> instance.getPeonPodWithRetries(clientApi.getClient(), new K8sTaskId(ID).getK8sJobName(), 1, 1),
|
||||||
StringUtils.format("K8s pod with label: job-name=%s not found", ID)
|
StringUtils.format("K8s pod with label: job-name=%s not found", ID)
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
@ -500,10 +501,10 @@ public class KubernetesPeonClientTest
|
||||||
void test_getPeonLogsWatcher_withJob_returnsWatchLogInOptional()
|
void test_getPeonLogsWatcher_withJob_returnsWatchLogInOptional()
|
||||||
{
|
{
|
||||||
server.expect().get()
|
server.expect().get()
|
||||||
.withPath("/apis/batch/v1/namespaces/namespace/jobs/id")
|
.withPath("/apis/batch/v1/namespaces/namespace/jobs/" + KUBERNETES_JOB_NAME)
|
||||||
.andReturn(HttpURLConnection.HTTP_OK, new JobBuilder()
|
.andReturn(HttpURLConnection.HTTP_OK, new JobBuilder()
|
||||||
.withNewMetadata()
|
.withNewMetadata()
|
||||||
.withName(JOB_NAME)
|
.withName(KUBERNETES_JOB_NAME)
|
||||||
.withUid("uid")
|
.withUid("uid")
|
||||||
.endMetadata()
|
.endMetadata()
|
||||||
.withNewSpec()
|
.withNewSpec()
|
||||||
|
|
|
@ -159,7 +159,7 @@ public class DruidPeonClientIntegrationTest
|
||||||
|
|
||||||
// now copy the task.json file from the pod and make sure its the same as our task.json we expected
|
// now copy the task.json file from the pod and make sure its the same as our task.json we expected
|
||||||
Path downloadPath = Paths.get(tempDir.toAbsolutePath().toString(), "task.json");
|
Path downloadPath = Paths.get(tempDir.toAbsolutePath().toString(), "task.json");
|
||||||
Pod mainJobPod = peonClient.getPeonPodWithRetries(taskId);
|
Pod mainJobPod = peonClient.getPeonPodWithRetries(taskId.getK8sJobName());
|
||||||
k8sClient.executeRequest(client -> {
|
k8sClient.executeRequest(client -> {
|
||||||
client.pods()
|
client.pods()
|
||||||
.inNamespace("default")
|
.inNamespace("default")
|
||||||
|
|
|
@ -1,12 +1,12 @@
|
||||||
apiVersion: batch/v1
|
apiVersion: batch/v1
|
||||||
kind: Job
|
kind: Job
|
||||||
metadata:
|
metadata:
|
||||||
name: "id"
|
name: "id-3e70afe5cd823dfc7dd308eea616426b"
|
||||||
spec:
|
spec:
|
||||||
template:
|
template:
|
||||||
metadata:
|
metadata:
|
||||||
labels:
|
labels:
|
||||||
job-name: id
|
job-name: id-3e70afe5cd823dfc7dd308eea616426b
|
||||||
name: id-kmwkw
|
name: id-kmwkw
|
||||||
spec:
|
spec:
|
||||||
containers:
|
containers:
|
||||||
|
|
|
@ -6,7 +6,7 @@ metadata:
|
||||||
tls.enabled: "false"
|
tls.enabled: "false"
|
||||||
labels:
|
labels:
|
||||||
druid.k8s.peons: "true"
|
druid.k8s.peons: "true"
|
||||||
name: "id"
|
name: "id-3e70afe5cd823dfc7dd308eea616426b"
|
||||||
spec:
|
spec:
|
||||||
activeDeadlineSeconds: 14400
|
activeDeadlineSeconds: 14400
|
||||||
backoffLimit: 0
|
backoffLimit: 0
|
||||||
|
@ -60,6 +60,6 @@ spec:
|
||||||
memory: "2400000000"
|
memory: "2400000000"
|
||||||
cpu: "1000m"
|
cpu: "1000m"
|
||||||
ephemeral-storage: 1Gi
|
ephemeral-storage: 1Gi
|
||||||
hostname: "id"
|
hostname: "id-3e70afe5cd823dfc7dd308eea616426b"
|
||||||
restartPolicy: "Never"
|
restartPolicy: "Never"
|
||||||
ttlSecondsAfterFinished: 172800
|
ttlSecondsAfterFinished: 172800
|
|
@ -6,7 +6,7 @@ metadata:
|
||||||
tls.enabled: "false"
|
tls.enabled: "false"
|
||||||
labels:
|
labels:
|
||||||
druid.k8s.peons: "true"
|
druid.k8s.peons: "true"
|
||||||
name: "id"
|
name: "id-3e70afe5cd823dfc7dd308eea616426b"
|
||||||
spec:
|
spec:
|
||||||
activeDeadlineSeconds: 14400
|
activeDeadlineSeconds: 14400
|
||||||
backoffLimit: 0
|
backoffLimit: 0
|
||||||
|
@ -18,7 +18,7 @@ spec:
|
||||||
labels:
|
labels:
|
||||||
druid.k8s.peons: "true"
|
druid.k8s.peons: "true"
|
||||||
spec:
|
spec:
|
||||||
hostname: "id"
|
hostname: "id-3e70afe5cd823dfc7dd308eea616426b"
|
||||||
containers:
|
containers:
|
||||||
- args:
|
- args:
|
||||||
- "/kubexit/kubexit /bin/sh -c \"/peon.sh /druid/data/baseTaskDir/noop_2022-09-26T22:08:00.582Z_352988d2-5ff7-4b70-977c-3de96f9bfca6 1\""
|
- "/kubexit/kubexit /bin/sh -c \"/peon.sh /druid/data/baseTaskDir/noop_2022-09-26T22:08:00.582Z_352988d2-5ff7-4b70-977c-3de96f9bfca6 1\""
|
||||||
|
|
|
@ -6,7 +6,7 @@ metadata:
|
||||||
tls.enabled: "false"
|
tls.enabled: "false"
|
||||||
labels:
|
labels:
|
||||||
druid.k8s.peons: "true"
|
druid.k8s.peons: "true"
|
||||||
name: "id"
|
name: "id-3e70afe5cd823dfc7dd308eea616426b"
|
||||||
spec:
|
spec:
|
||||||
activeDeadlineSeconds: 14400
|
activeDeadlineSeconds: 14400
|
||||||
backoffLimit: 0
|
backoffLimit: 0
|
||||||
|
@ -18,7 +18,7 @@ spec:
|
||||||
labels:
|
labels:
|
||||||
druid.k8s.peons: "true"
|
druid.k8s.peons: "true"
|
||||||
spec:
|
spec:
|
||||||
hostname: "id"
|
hostname: "id-3e70afe5cd823dfc7dd308eea616426b"
|
||||||
containers:
|
containers:
|
||||||
- args:
|
- args:
|
||||||
- "/kubexit/kubexit /bin/sh -c \"/peon.sh /druid/data/baseTaskDir/noop_2022-09-26T22:08:00.582Z_352988d2-5ff7-4b70-977c-3de96f9bfca6 1\""
|
- "/kubexit/kubexit /bin/sh -c \"/peon.sh /druid/data/baseTaskDir/noop_2022-09-26T22:08:00.582Z_352988d2-5ff7-4b70-977c-3de96f9bfca6 1\""
|
||||||
|
|
|
@ -1,7 +1,7 @@
|
||||||
apiVersion: batch/v1
|
apiVersion: batch/v1
|
||||||
kind: Job
|
kind: Job
|
||||||
metadata:
|
metadata:
|
||||||
name: "id"
|
name: "id-3e70afe5cd823dfc7dd308eea616426b"
|
||||||
labels:
|
labels:
|
||||||
druid.k8s.peons: "true"
|
druid.k8s.peons: "true"
|
||||||
druid.task.id: "id"
|
druid.task.id: "id"
|
||||||
|
|
|
@ -1,7 +1,7 @@
|
||||||
apiVersion: batch/v1
|
apiVersion: batch/v1
|
||||||
kind: Job
|
kind: Job
|
||||||
metadata:
|
metadata:
|
||||||
name: "apiissuedkillwikipedia3omjobnbc10000101t000000000z20230514t0000"
|
name: "apiissuedkillwikipedia3omjobnb-18ed64f09a02fab468b9bba38739871f"
|
||||||
labels:
|
labels:
|
||||||
druid.k8s.peons: "true"
|
druid.k8s.peons: "true"
|
||||||
druid.task.id: "apiissuedkillwikipedia3omjobnbc10000101t000000000z20230514t0000"
|
druid.task.id: "apiissuedkillwikipedia3omjobnbc10000101t000000000z20230514t0000"
|
||||||
|
|
|
@ -1,7 +1,7 @@
|
||||||
apiVersion: batch/v1
|
apiVersion: batch/v1
|
||||||
kind: Job
|
kind: Job
|
||||||
metadata:
|
metadata:
|
||||||
name: "id"
|
name: "id-3e70afe5cd823dfc7dd308eea616426b"
|
||||||
labels:
|
labels:
|
||||||
druid.k8s.peons: "true"
|
druid.k8s.peons: "true"
|
||||||
druid.task.id: "id"
|
druid.task.id: "id"
|
||||||
|
|
|
@ -6,7 +6,7 @@ metadata:
|
||||||
tls.enabled: "false"
|
tls.enabled: "false"
|
||||||
labels:
|
labels:
|
||||||
druid.k8s.peons: "true"
|
druid.k8s.peons: "true"
|
||||||
name: "id"
|
name: "id-3e70afe5cd823dfc7dd308eea616426b"
|
||||||
spec:
|
spec:
|
||||||
activeDeadlineSeconds: 14400
|
activeDeadlineSeconds: 14400
|
||||||
backoffLimit: 0
|
backoffLimit: 0
|
||||||
|
@ -86,7 +86,7 @@ spec:
|
||||||
name: "graveyard"
|
name: "graveyard"
|
||||||
- mountPath: "/kubexit"
|
- mountPath: "/kubexit"
|
||||||
name: "kubexit"
|
name: "kubexit"
|
||||||
hostname: "id"
|
hostname: "id-3e70afe5cd823dfc7dd308eea616426b"
|
||||||
initContainers:
|
initContainers:
|
||||||
- command:
|
- command:
|
||||||
- "cp"
|
- "cp"
|
||||||
|
|
|
@ -6,7 +6,7 @@ metadata:
|
||||||
tls.enabled: "false"
|
tls.enabled: "false"
|
||||||
labels:
|
labels:
|
||||||
druid.k8s.peons: "true"
|
druid.k8s.peons: "true"
|
||||||
name: "id"
|
name: "id-3e70afe5cd823dfc7dd308eea616426b"
|
||||||
spec:
|
spec:
|
||||||
activeDeadlineSeconds: 14400
|
activeDeadlineSeconds: 14400
|
||||||
backoffLimit: 0
|
backoffLimit: 0
|
||||||
|
@ -18,7 +18,7 @@ spec:
|
||||||
labels:
|
labels:
|
||||||
druid.k8s.peons: "true"
|
druid.k8s.peons: "true"
|
||||||
spec:
|
spec:
|
||||||
hostname: "id"
|
hostname: "id-3e70afe5cd823dfc7dd308eea616426b"
|
||||||
containers:
|
containers:
|
||||||
- args:
|
- args:
|
||||||
- foo && bar
|
- foo && bar
|
||||||
|
|
Loading…
Reference in New Issue