Hook up pod template adapter (#13966)

* Hook up PodTemplateTaskAdapter

* Make task adapter TYPE parameters final

* Rename adapters types

* Include specified adapter name in exception message

* Documentation for sidecarSupport deprecation

* Fix order

* Set TASK_ID as environment variable in PodTemplateTaskAdapter (#13969)

* Update docs/development/extensions-contrib/k8s-jobs.md

Co-authored-by: Abhishek Agarwal <1477457+abhishekagarwal87@users.noreply.github.com>

* Hook up PodTemplateTaskAdapter

* Make task adapter TYPE parameters final

* Rename adapters types

* Include specified adapter name in exception message

* Documentation for sidecarSupport deprecation

* Fix order

* fix spelling errors

---------

Co-authored-by: Abhishek Agarwal <1477457+abhishekagarwal87@users.noreply.github.com>
This commit is contained in:
Nicholas Lippis 2023-03-24 14:13:46 -04:00 committed by GitHub
parent 976d39281f
commit 8a72544bd2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 221 additions and 32 deletions

View File

@ -28,7 +28,22 @@ Consider this an [EXPERIMENTAL](../experimental.md) feature mostly because it ha
## How it works ## How it works
The K8s extension takes the podSpec of your `Overlord` pod and creates a kubernetes job from this podSpec. Thus if you have sidecars such as Splunk or Istio it can optionally launch a task as a K8s job. All jobs are natively restorable, they are decoupled from the druid deployment, thus restarting pods or doing upgrades has no affect on tasks in flight. They will continue to run and when the overlord comes back up it will start tracking them again. The K8s extension builds a pod spec using the specified pod adapter, the default implementation takes the podSpec of your `Overlord` pod and creates a kubernetes job from this podSpec. Thus if you have sidecars such as Splunk or Istio it can optionally launch a task as a K8s job. All jobs are natively restorable, they are decoupled from the druid deployment, thus restarting pods or doing upgrades has no affect on tasks in flight. They will continue to run and when the overlord comes back up it will start tracking them again.
## Pod Adapters
The logic defining how the pod template is built for your kubernetes job depends on which pod adapter you have specified.
### Overlord Single Container Pod Adapter
The overlord single container pod adapter takes the podSpec of your `Overlord` pod and creates a kubernetes job from this podSpec. This is the default pod adapter implementation, to explicitly enable it you can specify the runtime property `druid.indexer.runner.k8s.adapter.type: overlordSingleContainer`
### Overlord Multi Container Pod Adapter
The overlord multi container pod adapter takes the podSpec of your `Overlord` pod and creates a kubernetes job from this podSpec. It uses kubexit to manage dependency ordering between the main container that runs your druid peon and other sidecars defined in the `Overlord` pod spec. To enable this pod adapter you can specify the runtime property `druid.indexer.runner.k8s.adapter.type: overlordMultiContainer`
### Custom Template Pod Adapter
The custom template pod adapter allows you to specify a pod template file per task type. This adapter requires you to specify a `base` pod spec which will be used in the case that a task specific pod spec has not been defined. To enable this pod adapter you can specify the runtime property `druid.indexer.runner.k8s.adapter.type: customTemplateAdapter`
The base pod template must be specified as the runtime property `druid.indexer.runner.k8s.podTemplate.base: /path/to/basePodSpec.yaml`
Task specific pod templates must be specified as the runtime property `druid.indexer.runner.k8s.podTemplate.{taskType}: /path/to/taskSpecificPodSpec.yaml` where {taskType} is the name of the task type i.e `index_parallel`
## Configuration ## Configuration
@ -55,7 +70,7 @@ Additional Configuration
|Property|Possible Values|Description|Default|required| |Property|Possible Values|Description|Default|required|
|--------|---------------|-----------|-------|--------| |--------|---------------|-----------|-------|--------|
|`druid.indexer.runner.debugJobs`|`boolean`|Clean up K8s jobs after tasks complete.|False|No| |`druid.indexer.runner.debugJobs`|`boolean`|Clean up K8s jobs after tasks complete.|False|No|
|`druid.indexer.runner.sidecarSupport`|`boolean`|If your overlord pod has sidecars, this will attempt to start the task with the same sidecars as the overlord pod.|False|No| |`druid.indexer.runner.sidecarSupport`|`boolean`|Deprecated, specify adapter type as runtime property `druid.indexer.runner.k8s.adapter.type: overlordMultiContainer` instead. If your overlord pod has sidecars, this will attempt to start the task with the same sidecars as the overlord pod.|False|No|
|`druid.indexer.runner.primaryContainerName`|`String`|If running with sidecars, the `primaryContainerName` should be that of your druid container like `druid-overlord`.|First container in `podSpec` list|No| |`druid.indexer.runner.primaryContainerName`|`String`|If running with sidecars, the `primaryContainerName` should be that of your druid container like `druid-overlord`.|First container in `podSpec` list|No|
|`druid.indexer.runner.kubexitImage`|`String`|Used kubexit project to help shutdown sidecars when the main pod completes. Otherwise jobs with sidecars never terminate.|karlkfi/kubexit:latest|No| |`druid.indexer.runner.kubexitImage`|`String`|Used kubexit project to help shutdown sidecars when the main pod completes. Otherwise jobs with sidecars never terminate.|karlkfi/kubexit:latest|No|
|`druid.indexer.runner.disableClientProxy`|`boolean`|Use this if you have a global http(s) proxy and you wish to bypass it.|false|No| |`druid.indexer.runner.disableClientProxy`|`boolean`|Use this if you have a global http(s) proxy and you wish to bypass it.|false|No|

View File

@ -38,6 +38,12 @@ public class KubernetesTaskRunnerConfig
@JsonProperty @JsonProperty
public boolean debugJobs = false; public boolean debugJobs = false;
/**
* Deprecated, please specify adapter type runtime property instead
*
* I.E `druid.indexer.runner.k8s.adapter.type: overlordMultiContainer`
*/
@Deprecated
@JsonProperty @JsonProperty
public boolean sidecarSupport = false; public boolean sidecarSupport = false;

View File

@ -23,20 +23,26 @@ import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.inject.Inject; import com.google.inject.Inject;
import io.fabric8.kubernetes.client.Config; import io.fabric8.kubernetes.client.Config;
import org.apache.druid.guice.IndexingServiceModuleHelper;
import org.apache.druid.guice.annotations.Self; import org.apache.druid.guice.annotations.Self;
import org.apache.druid.guice.annotations.Smile; import org.apache.druid.guice.annotations.Smile;
import org.apache.druid.indexing.common.config.TaskConfig; import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.overlord.TaskRunnerFactory; import org.apache.druid.indexing.overlord.TaskRunnerFactory;
import org.apache.druid.indexing.overlord.config.TaskQueueConfig; import org.apache.druid.indexing.overlord.config.TaskQueueConfig;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.k8s.overlord.common.DruidKubernetesClient; import org.apache.druid.k8s.overlord.common.DruidKubernetesClient;
import org.apache.druid.k8s.overlord.common.DruidKubernetesPeonClient; import org.apache.druid.k8s.overlord.common.DruidKubernetesPeonClient;
import org.apache.druid.k8s.overlord.common.K8sTaskAdapter;
import org.apache.druid.k8s.overlord.common.MultiContainerTaskAdapter; import org.apache.druid.k8s.overlord.common.MultiContainerTaskAdapter;
import org.apache.druid.k8s.overlord.common.PodTemplateTaskAdapter;
import org.apache.druid.k8s.overlord.common.SingleContainerTaskAdapter; import org.apache.druid.k8s.overlord.common.SingleContainerTaskAdapter;
import org.apache.druid.k8s.overlord.common.TaskAdapter;
import org.apache.druid.server.DruidNode; import org.apache.druid.server.DruidNode;
import org.apache.druid.server.log.StartupLoggingConfig; import org.apache.druid.server.log.StartupLoggingConfig;
import org.apache.druid.tasklogs.TaskLogPusher; import org.apache.druid.tasklogs.TaskLogPusher;
import java.util.Locale;
import java.util.Properties;
public class KubernetesTaskRunnerFactory implements TaskRunnerFactory<KubernetesTaskRunner> public class KubernetesTaskRunnerFactory implements TaskRunnerFactory<KubernetesTaskRunner>
{ {
public static final String TYPE_NAME = "k8s"; public static final String TYPE_NAME = "k8s";
@ -47,6 +53,7 @@ public class KubernetesTaskRunnerFactory implements TaskRunnerFactory<Kubernetes
private final TaskLogPusher taskLogPusher; private final TaskLogPusher taskLogPusher;
private final DruidNode druidNode; private final DruidNode druidNode;
private final TaskConfig taskConfig; private final TaskConfig taskConfig;
private final Properties properties;
private KubernetesTaskRunner runner; private KubernetesTaskRunner runner;
@ -58,7 +65,8 @@ public class KubernetesTaskRunnerFactory implements TaskRunnerFactory<Kubernetes
@JacksonInject TaskQueueConfig taskQueueConfig, @JacksonInject TaskQueueConfig taskQueueConfig,
TaskLogPusher taskLogPusher, TaskLogPusher taskLogPusher,
@Self DruidNode druidNode, @Self DruidNode druidNode,
TaskConfig taskConfig TaskConfig taskConfig,
Properties properties
) )
{ {
@ -69,6 +77,7 @@ public class KubernetesTaskRunnerFactory implements TaskRunnerFactory<Kubernetes
this.taskLogPusher = taskLogPusher; this.taskLogPusher = taskLogPusher;
this.druidNode = druidNode; this.druidNode = druidNode;
this.taskConfig = taskConfig; this.taskConfig = taskConfig;
this.properties = properties;
} }
@Override @Override
@ -84,29 +93,8 @@ public class KubernetesTaskRunnerFactory implements TaskRunnerFactory<Kubernetes
client = new DruidKubernetesClient(); client = new DruidKubernetesClient();
} }
K8sTaskAdapter adapter;
if (kubernetesTaskRunnerConfig.sidecarSupport) {
adapter = new MultiContainerTaskAdapter(
client,
kubernetesTaskRunnerConfig,
taskConfig,
startupLoggingConfig,
druidNode,
smileMapper
);
} else {
adapter = new SingleContainerTaskAdapter(
client,
kubernetesTaskRunnerConfig,
taskConfig,
startupLoggingConfig,
druidNode,
smileMapper
);
}
runner = new KubernetesTaskRunner( runner = new KubernetesTaskRunner(
adapter, buildTaskAdapter(client),
kubernetesTaskRunnerConfig, kubernetesTaskRunnerConfig,
taskQueueConfig, taskQueueConfig,
taskLogPusher, taskLogPusher,
@ -120,4 +108,51 @@ public class KubernetesTaskRunnerFactory implements TaskRunnerFactory<Kubernetes
{ {
return runner; return runner;
} }
private TaskAdapter buildTaskAdapter(DruidKubernetesClient client)
{
String adapter = properties.getProperty(String.format(
Locale.ROOT,
"%s.%s.adapter.type",
IndexingServiceModuleHelper.INDEXER_RUNNER_PROPERTY_PREFIX,
TYPE_NAME
));
if (adapter != null && !MultiContainerTaskAdapter.TYPE.equals(adapter) && kubernetesTaskRunnerConfig.sidecarSupport) {
throw new IAE(
"Invalid pod adapter [%s], only pod adapter [%s] can be specified when sidecarSupport is enabled",
adapter,
MultiContainerTaskAdapter.TYPE
);
}
if (MultiContainerTaskAdapter.TYPE.equals(adapter) || kubernetesTaskRunnerConfig.sidecarSupport) {
return new MultiContainerTaskAdapter(
client,
kubernetesTaskRunnerConfig,
taskConfig,
startupLoggingConfig,
druidNode,
smileMapper
);
} else if (PodTemplateTaskAdapter.TYPE.equals(adapter)) {
return new PodTemplateTaskAdapter(
client,
kubernetesTaskRunnerConfig,
taskConfig,
druidNode,
smileMapper,
properties
);
} else {
return new SingleContainerTaskAdapter(
client,
kubernetesTaskRunnerConfig,
taskConfig,
startupLoggingConfig,
druidNode,
smileMapper
);
}
}
} }

View File

@ -47,7 +47,7 @@ import java.util.Map;
public class MultiContainerTaskAdapter extends K8sTaskAdapter public class MultiContainerTaskAdapter extends K8sTaskAdapter
{ {
public static String TYPE = "MultiContainer"; public static final String TYPE = "overlordMultiContainer";
public MultiContainerTaskAdapter( public MultiContainerTaskAdapter(
KubernetesClientApi client, KubernetesClientApi client,

View File

@ -69,7 +69,7 @@ import java.util.Properties;
*/ */
public class PodTemplateTaskAdapter implements TaskAdapter public class PodTemplateTaskAdapter implements TaskAdapter
{ {
public static String TYPE = "PodTemplate"; public static final String TYPE = "customTemplateAdapter";
private static final Logger log = new Logger(PodTemplateTaskAdapter.class); private static final Logger log = new Logger(PodTemplateTaskAdapter.class);
private static final String TASK_PROPERTY = IndexingServiceModuleHelper.INDEXER_RUNNER_PROPERTY_PREFIX + ".k8s.podTemplate.%s"; private static final String TASK_PROPERTY = IndexingServiceModuleHelper.INDEXER_RUNNER_PROPERTY_PREFIX + ".k8s.podTemplate.%s";

View File

@ -35,7 +35,7 @@ import java.util.Map;
public class SingleContainerTaskAdapter extends K8sTaskAdapter public class SingleContainerTaskAdapter extends K8sTaskAdapter
{ {
public static String TYPE = "SingleContainer"; public static final String TYPE = "overlordSingleContainer";
public SingleContainerTaskAdapter( public SingleContainerTaskAdapter(
KubernetesClientApi client, KubernetesClientApi client,

View File

@ -24,7 +24,9 @@ import com.google.common.collect.ImmutableList;
import org.apache.druid.indexing.common.TestUtils; import org.apache.druid.indexing.common.TestUtils;
import org.apache.druid.indexing.common.config.TaskConfig; import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.overlord.config.TaskQueueConfig; import org.apache.druid.indexing.overlord.config.TaskQueueConfig;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.k8s.overlord.common.MultiContainerTaskAdapter; import org.apache.druid.k8s.overlord.common.MultiContainerTaskAdapter;
import org.apache.druid.k8s.overlord.common.PodTemplateTaskAdapter;
import org.apache.druid.k8s.overlord.common.SingleContainerTaskAdapter; import org.apache.druid.k8s.overlord.common.SingleContainerTaskAdapter;
import org.apache.druid.server.DruidNode; import org.apache.druid.server.DruidNode;
import org.apache.druid.server.log.StartupLoggingConfig; import org.apache.druid.server.log.StartupLoggingConfig;
@ -34,6 +36,9 @@ import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import java.net.URL;
import java.util.Properties;
public class KubernetesTaskRunnerFactoryTest public class KubernetesTaskRunnerFactoryTest
{ {
private ObjectMapper objectMapper; private ObjectMapper objectMapper;
@ -43,6 +48,7 @@ public class KubernetesTaskRunnerFactoryTest
private TaskLogPusher taskLogPusher; private TaskLogPusher taskLogPusher;
private DruidNode druidNode; private DruidNode druidNode;
private TaskConfig taskConfig; private TaskConfig taskConfig;
private Properties properties;
@Before @Before
public void setup() public void setup()
@ -83,6 +89,7 @@ public class KubernetesTaskRunnerFactoryTest
false, false,
ImmutableList.of("/tmp") ImmutableList.of("/tmp")
); );
properties = new Properties();
} }
@Test @Test
@ -95,7 +102,8 @@ public class KubernetesTaskRunnerFactoryTest
taskQueueConfig, taskQueueConfig,
taskLogPusher, taskLogPusher,
druidNode, druidNode,
taskConfig taskConfig,
properties
); );
KubernetesTaskRunner expectedRunner = factory.build(); KubernetesTaskRunner expectedRunner = factory.build();
@ -114,7 +122,8 @@ public class KubernetesTaskRunnerFactoryTest
taskQueueConfig, taskQueueConfig,
taskLogPusher, taskLogPusher,
druidNode, druidNode,
taskConfig taskConfig,
properties
); );
KubernetesTaskRunner runner = factory.build(); KubernetesTaskRunner runner = factory.build();
@ -135,7 +144,8 @@ public class KubernetesTaskRunnerFactoryTest
taskQueueConfig, taskQueueConfig,
taskLogPusher, taskLogPusher,
druidNode, druidNode,
taskConfig taskConfig,
properties
); );
KubernetesTaskRunner runner = factory.build(); KubernetesTaskRunner runner = factory.build();
@ -143,4 +153,127 @@ public class KubernetesTaskRunnerFactoryTest
Assert.assertNotNull(runner); Assert.assertNotNull(runner);
Assert.assertTrue(runner.adapter instanceof MultiContainerTaskAdapter); Assert.assertTrue(runner.adapter instanceof MultiContainerTaskAdapter);
} }
@Test
public void test_build_withSingleContainerAdapterType_returnsKubernetesTaskRunnerWithSingleContainerTaskAdapter()
{
Properties props = new Properties();
props.setProperty("druid.indexer.runner.k8s.adapter.type", "overlordSingleContainer");
KubernetesTaskRunnerFactory factory = new KubernetesTaskRunnerFactory(
objectMapper,
kubernetesTaskRunnerConfig,
startupLoggingConfig,
taskQueueConfig,
taskLogPusher,
druidNode,
taskConfig,
props
);
KubernetesTaskRunner runner = factory.build();
Assert.assertNotNull(runner);
Assert.assertTrue(runner.adapter instanceof SingleContainerTaskAdapter);
}
@Test
public void test_build_withSingleContainerAdapterTypeAndSidecarSupport_throwsIAE()
{
kubernetesTaskRunnerConfig.sidecarSupport = true;
Properties props = new Properties();
props.setProperty("druid.indexer.runner.k8s.adapter.type", "overlordSingleContainer");
KubernetesTaskRunnerFactory factory = new KubernetesTaskRunnerFactory(
objectMapper,
kubernetesTaskRunnerConfig,
startupLoggingConfig,
taskQueueConfig,
taskLogPusher,
druidNode,
taskConfig,
props
);
Assert.assertThrows(
"Invalid pod adapter [overlordSingleContainer], only pod adapter [overlordMultiContainer] can be specified when sidecarSupport is enabled",
IAE.class,
factory::build
);
}
@Test
public void test_build_withMultiContainerAdapterType_returnsKubernetesTaskRunnerWithMultiContainerTaskAdapter()
{
kubernetesTaskRunnerConfig.sidecarSupport = true;
Properties props = new Properties();
props.setProperty("druid.indexer.runner.k8s.adapter.type", "overlordMultiContainer");
KubernetesTaskRunnerFactory factory = new KubernetesTaskRunnerFactory(
objectMapper,
kubernetesTaskRunnerConfig,
startupLoggingConfig,
taskQueueConfig,
taskLogPusher,
druidNode,
taskConfig,
props
);
KubernetesTaskRunner runner = factory.build();
Assert.assertNotNull(runner);
Assert.assertTrue(runner.adapter instanceof MultiContainerTaskAdapter);
}
@Test
public void test_build_withMultiContainerAdapterTypeAndSidecarSupport_returnsKubernetesTaskRunnerWithMultiContainerTaskAdapter()
{
Properties props = new Properties();
props.setProperty("druid.indexer.runner.k8s.adapter.type", "overlordMultiContainer");
KubernetesTaskRunnerFactory factory = new KubernetesTaskRunnerFactory(
objectMapper,
kubernetesTaskRunnerConfig,
startupLoggingConfig,
taskQueueConfig,
taskLogPusher,
druidNode,
taskConfig,
props
);
KubernetesTaskRunner runner = factory.build();
Assert.assertNotNull(runner);
Assert.assertTrue(runner.adapter instanceof MultiContainerTaskAdapter);
}
@Test
public void test_build_withPodTemplateAdapterType_returnsKubernetesTaskRunnerWithPodTemplateTaskAdapter()
{
URL url = this.getClass().getClassLoader().getResource("basePodTemplate.yaml");
Properties props = new Properties();
props.setProperty("druid.indexer.runner.k8s.adapter.type", "customTemplateAdapter");
props.setProperty("druid.indexer.runner.k8s.podTemplate.base", url.getPath());
KubernetesTaskRunnerFactory factory = new KubernetesTaskRunnerFactory(
objectMapper,
kubernetesTaskRunnerConfig,
startupLoggingConfig,
taskQueueConfig,
taskLogPusher,
druidNode,
taskConfig,
props
);
KubernetesTaskRunner runner = factory.build();
Assert.assertNotNull(runner);
Assert.assertTrue(runner.adapter instanceof PodTemplateTaskAdapter);
}
} }