Support broadcast segmetns (#14789)

This commit is contained in:
George Shiqi Wu 2023-08-11 14:14:05 -04:00 committed by GitHub
parent ec28672d07
commit c8a11702db
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 50 additions and 1 deletions

View File

@ -34,6 +34,7 @@ public class DruidK8sConstants
public static final String TASK_JSON_ENV = "TASK_JSON";
public static final String TASK_DIR_ENV = "TASK_DIR";
public static final String TASK_ID_ENV = "TASK_ID";
public static final String LOAD_BROADCAST_SEGMENTS_ENV = "LOAD_BROADCAST_SEGMENTS";
public static final String JAVA_OPTS = "JAVA_OPTS";
public static final String DRUID_HOST_ENV = "druid_host";
public static final String DRUID_HOSTNAME_ENV = "HOSTNAME";

View File

@ -224,7 +224,11 @@ public class PodTemplateTaskAdapter implements TaskAdapter
.withValueFrom(new EnvVarSourceBuilder().withFieldRef(new ObjectFieldSelector(
null,
StringUtils.format("metadata.annotations['%s']", DruidK8sConstants.TASK)
)).build()).build()
)).build()).build(),
new EnvVarBuilder()
.withName(DruidK8sConstants.LOAD_BROADCAST_SEGMENTS_ENV)
.withValue(Boolean.toString(task.supportsQueries()))
.build()
);
}

View File

@ -36,6 +36,7 @@ import org.apache.druid.k8s.overlord.common.Base64Compression;
import org.apache.druid.k8s.overlord.common.DruidK8sConstants;
import org.apache.druid.k8s.overlord.common.K8sTestUtils;
import org.apache.druid.server.DruidNode;
import org.easymock.EasyMock;
import org.junit.Assert;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
@ -48,6 +49,7 @@ import java.nio.file.Path;
import java.util.Collections;
import java.util.Map;
import java.util.Properties;
import java.util.stream.Collectors;
public class PodTemplateTaskAdapterTest
{
@ -354,6 +356,42 @@ public class PodTemplateTaskAdapterTest
assertJobSpecsEqual(actual, expected);
}
@Test
public void test_fromTask_taskSupportsQueries() throws IOException
{
Path templatePath = Files.createFile(tempDir.resolve("noop.yaml"));
mapper.writeValue(templatePath.toFile(), podTemplateSpec);
Properties props = new Properties();
props.setProperty("druid.indexer.runner.k8s.podTemplate.base", templatePath.toString());
props.setProperty("druid.indexer.runner.k8s.podTemplate.queryable", templatePath.toString());
PodTemplateTaskAdapter adapter = new PodTemplateTaskAdapter(
taskRunnerConfig,
taskConfig,
node,
mapper,
props
);
Task task = EasyMock.mock(Task.class);
EasyMock.expect(task.supportsQueries()).andReturn(true);
EasyMock.expect(task.getType()).andReturn("queryable").anyTimes();
EasyMock.expect(task.getId()).andReturn("id").anyTimes();
EasyMock.expect(task.getGroupId()).andReturn("groupid").anyTimes();
EasyMock.expect(task.getDataSource()).andReturn("datasource").anyTimes();
EasyMock.replay(task);
Job actual = adapter.fromTask(task);
EasyMock.verify(task);
Assertions.assertEquals("true", actual.getSpec().getTemplate()
.getSpec().getContainers()
.get(0).getEnv().stream()
.filter(env -> env.getName().equals(DruidK8sConstants.LOAD_BROADCAST_SEGMENTS_ENV))
.collect(Collectors.toList()).get(0).getValue());
}
private void assertJobSpecsEqual(Job actual, Job expected) throws IOException

View File

@ -46,5 +46,7 @@ spec:
valueFrom:
fieldRef:
fieldPath: "metadata.annotations['task']"
- name: "LOAD_BROADCAST_SEGMENTS"
value: "false"
image: one
name: primary

View File

@ -46,5 +46,7 @@ spec:
valueFrom:
fieldRef:
fieldPath: "metadata.annotations['task']"
- name: "LOAD_BROADCAST_SEGMENTS"
value: "false"
image: one
name: primary

View File

@ -46,5 +46,7 @@ spec:
valueFrom:
fieldRef:
fieldPath: "metadata.annotations['task']"
- name: "LOAD_BROADCAST_SEGMENTS"
value: "false"
image: one
name: primary