Set monotonically increasing worker capacity in start-druid-main (#13581)

This commit updates the task memory allocation logic.
- min task count is 2 and max task count is number of cpus on the machine
- task count increases wrt total task memory
- task memory increases from 512m to 2g
This commit is contained in:
Rishabh Singh 2022-12-16 15:34:30 +05:30 committed by GitHub
parent d9e5245ff0
commit f42722e627
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 20 additions and 28 deletions

View File

@ -36,14 +36,6 @@ TASK_JAVA_OPTS_ARRAY = ["-server", "-Duser.timezone=UTC", "-Dfile.encoding=UTF-8
TASK_JAVA_OPTS_PROPERTY = "druid.indexer.runner.javaOptsArray" TASK_JAVA_OPTS_PROPERTY = "druid.indexer.runner.javaOptsArray"
TASK_WORKER_CAPACITY_PROPERTY = "druid.worker.capacity" TASK_WORKER_CAPACITY_PROPERTY = "druid.worker.capacity"
TASK_COUNT = "task-count" TASK_COUNT = "task-count"
TASK_MEM_TYPE_LOW = "low"
TASK_MEM_TYPE_MEDIUM = "medium"
TASK_MEM_TYPE_HIGH = "high"
TASK_MEM_MAP = {
TASK_MEM_TYPE_LOW: ["-Xms256m", "-Xmx256m", "-XX:MaxDirectMemorySize=256m"],
TASK_MEM_TYPE_MEDIUM: ["-Xms512m", "-Xmx512m", "-XX:MaxDirectMemorySize=512m"],
TASK_MEM_TYPE_HIGH: ["-Xms1g", "-Xmx1g", "-XX:MaxDirectMemorySize=1g"]
}
BROKER = "broker" BROKER = "broker"
ROUTER = "router" ROUTER = "router"
@ -436,43 +428,43 @@ def check_memory_constraint(total_memory, services):
return int(total_memory * 0.8) return int(total_memory * 0.8)
def build_mm_task_java_opts_array(memory_type): def build_mm_task_java_opts_array(task_memory):
task_memory = '-D{0}=['.format(TASK_JAVA_OPTS_PROPERTY) memory = int(task_memory / 2)
mem_array = ["-Xms{0}m".format(memory), "-Xmx{0}m".format(memory), "-XX:MaxDirectMemorySize={0}m".format(memory)]
mem_array = TASK_MEM_MAP.get(memory_type)
java_opts_list = TASK_JAVA_OPTS_ARRAY + mem_array java_opts_list = TASK_JAVA_OPTS_ARRAY + mem_array
for item in java_opts_list: task_java_opts_value = ''
task_memory += '\"{0}\",'.format(item)
task_memory = task_memory[:-1] for item in java_opts_list:
task_memory += ']' task_java_opts_value += '\"{0}\",'.format(item)
return task_memory
task_java_opts_value = task_java_opts_value[:-1]
task_memory_config = '-D{0}=[{1}]'.format(TASK_JAVA_OPTS_PROPERTY, task_java_opts_value)
return task_memory_config
def compute_tasks_memory(allocated_memory): def compute_tasks_memory(allocated_memory):
if allocated_memory >= 4096: cpu_count = multiprocessing.cpu_count()
task_count = int(allocated_memory / 2048)
memory_type = TASK_MEM_TYPE_HIGH if allocated_memory >= cpu_count * 1024:
task_memory_mb = 2048 task_count = cpu_count
task_memory_mb = min(2048, int(allocated_memory / cpu_count))
elif allocated_memory >= 2048: elif allocated_memory >= 2048:
task_count = int(allocated_memory / 1024) task_count = int(allocated_memory / 1024)
memory_type = TASK_MEM_TYPE_MEDIUM
task_memory_mb = 1024 task_memory_mb = 1024
else: else:
task_count = 2 task_count = 2
memory_type = TASK_MEM_TYPE_LOW task_memory_mb = int(allocated_memory / task_count)
task_memory_mb = 512
task_count = min(task_count, multiprocessing.cpu_count())
return memory_type, task_count, task_memory_mb return task_count, task_memory_mb
def build_memory_config(service, allocated_memory): def build_memory_config(service, allocated_memory):
if service == TASKS: if service == TASKS:
memory_type, task_count, task_memory = compute_tasks_memory(allocated_memory) task_count, task_memory = compute_tasks_memory(allocated_memory)
java_opts_array = build_mm_task_java_opts_array(memory_type) java_opts_array = build_mm_task_java_opts_array(task_memory)
return ['-D{0}={1}'.format(TASK_WORKER_CAPACITY_PROPERTY, task_count), return ['-D{0}={1}'.format(TASK_WORKER_CAPACITY_PROPERTY, task_count),
java_opts_array], task_memory * task_count java_opts_array], task_memory * task_count
elif service == INDEXER: elif service == INDEXER: