Add limit to task payload size (#16512)

* Add limit to task payload size

* Change to a warning

* Remove test

* Fix unit tests

* Optionally throw alert

* PR comments

* Update indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java

Co-authored-by: Kashif Faraz <kashif.faraz@gmail.com>

* PR comments

* Reject large payloads

* Update docs/configuration/index.md

Co-authored-by: Kashif Faraz <kashif.faraz@gmail.com>

* Update indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java

Co-authored-by: Kashif Faraz <kashif.faraz@gmail.com>

---------

Co-authored-by: Kashif Faraz <kashif.faraz@gmail.com>
This commit is contained in:
George Shiqi Wu 2024-05-31 09:17:36 -07:00 committed by GitHub
parent b5b900b6a0
commit 0936798122
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 128 additions and 10 deletions

View File

@ -1126,6 +1126,7 @@ These Overlord static configurations can be defined in the `overlord/runtime.pro
|`druid.indexer.queue.startDelay`|Sleep this long before starting Overlord queue management. This can be useful to give a cluster time to re-orient itself (for example, after a widespread network issue).|`PT1M`|
|`druid.indexer.queue.restartDelay`|Sleep this long when Overlord queue management throws an exception before trying again.|`PT30S`|
|`druid.indexer.queue.storageSyncRate`|Sync Overlord state this often with an underlying task persistence mechanism.|`PT1M`|
|`druid.indexer.queue.maxTaskPayloadSize`|Maximum allowed size in bytes of a single task payload accepted by the Overlord.|none (allow all task payload sizes)|
The following configs only apply if the Overlord is running in remote mode. For a description of local vs. remote mode, see [Overlord service](../design/overlord.md).

View File

@ -35,6 +35,7 @@ import org.apache.druid.annotations.SuppressFBWarnings;
import org.apache.druid.common.utils.IdUtils;
import org.apache.druid.error.DruidException;
import org.apache.druid.error.EntryAlreadyExists;
import org.apache.druid.error.InvalidInput;
import org.apache.druid.indexer.RunnerTaskState;
import org.apache.druid.indexer.TaskLocation;
import org.apache.druid.indexer.TaskStatus;
@ -98,6 +99,9 @@ public class TaskQueue
private static final long MANAGEMENT_WAIT_TIMEOUT_NANOS = TimeUnit.SECONDS.toNanos(60);
private static final long MIN_WAIT_TIME_MS = 100;
// 60 MB warning threshold since 64 MB is the default max_allowed_packet size in MySQL 8+
private static final long TASK_SIZE_WARNING_THRESHOLD = 1024 * 1024 * 60;
// Task ID -> Task, for tasks that are active in some way (submitted, running, or finished and to-be-cleaned-up).
@GuardedBy("giant")
private final LinkedHashMap<String, Task> tasks = new LinkedHashMap<>();
@ -508,6 +512,7 @@ public class TaskQueue
if (taskStorage.getTask(task.getId()).isPresent()) {
throw EntryAlreadyExists.exception("Task[%s] already exists", task.getId());
}
validateTaskPayload(task);
// Set forceTimeChunkLock before adding task spec to taskStorage, so that we can see always consistent task spec.
task.addToContextIfAbsent(Tasks.FORCE_TIME_CHUNK_LOCK_KEY, lockConfig.isForceTimeChunkLock());
@ -1018,4 +1023,34 @@ public class TaskQueue
giant.unlock();
}
}
private void validateTaskPayload(Task task)
{
try {
String payload = passwordRedactingMapper.writeValueAsString(task);
if (config.getMaxTaskPayloadSize() != null && config.getMaxTaskPayloadSize().getBytesInInt() < payload.length()) {
throw InvalidInput.exception(
"Task[%s] has payload of size[%d] but max allowed size is [%d]. " +
"Reduce the size of the task payload or increase 'druid.indexer.queue.maxTaskPayloadSize'.",
task.getId(), payload.length(), config.getMaxTaskPayloadSize()
);
} else if (payload.length() > TASK_SIZE_WARNING_THRESHOLD) {
log.warn(
"Task[%s] of datasource[%s] has payload size[%d] larger than the recommended maximum[%d]. " +
"Large task payloads may cause stability issues in the Overlord and may fail while persisting to the metadata store." +
"Such tasks may be rejected by the Overlord in future Druid versions.",
task.getId(),
task.getDataSource(),
payload.length(),
TASK_SIZE_WARNING_THRESHOLD
);
}
}
catch (JsonProcessingException e) {
log.error(e, "Failed to parse task payload for validation");
throw DruidException.defensive(
"Failed to parse task payload for validation"
);
}
}
}

View File

@ -22,9 +22,12 @@ package org.apache.druid.indexing.overlord.config;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.common.config.Configs;
import org.apache.druid.java.util.common.HumanReadableBytes;
import org.joda.time.Duration;
import org.joda.time.Period;
import javax.annotation.Nullable;
public class TaskQueueConfig
{
@JsonProperty
@ -42,13 +45,17 @@ public class TaskQueueConfig
@JsonProperty
private int taskCompleteHandlerNumThreads;
@JsonProperty
private HumanReadableBytes maxTaskPayloadSize;
@JsonCreator
public TaskQueueConfig(
@JsonProperty("maxSize") final Integer maxSize,
@JsonProperty("startDelay") final Period startDelay,
@JsonProperty("restartDelay") final Period restartDelay,
@JsonProperty("storageSyncRate") final Period storageSyncRate,
@JsonProperty("taskCompleteHandlerNumThreads") final Integer taskCompleteHandlerNumThreads
@JsonProperty("taskCompleteHandlerNumThreads") final Integer taskCompleteHandlerNumThreads,
@JsonProperty("maxTaskPayloadSize") @Nullable final HumanReadableBytes maxTaskPayloadSize
)
{
this.maxSize = Configs.valueOrDefault(maxSize, Integer.MAX_VALUE);
@ -56,6 +63,7 @@ public class TaskQueueConfig
this.startDelay = defaultDuration(startDelay, "PT1M");
this.restartDelay = defaultDuration(restartDelay, "PT30S");
this.storageSyncRate = defaultDuration(storageSyncRate, "PT1M");
this.maxTaskPayloadSize = maxTaskPayloadSize;
}
public int getMaxSize()
@ -83,6 +91,11 @@ public class TaskQueueConfig
return storageSyncRate;
}
public HumanReadableBytes getMaxTaskPayloadSize()
{
return maxTaskPayloadSize;
}
private static Duration defaultDuration(final Period period, final String theDefault)
{
return (period == null ? new Period(theDefault) : period).toStandardDuration();

View File

@ -138,7 +138,7 @@ public class ConcurrentReplaceAndAppendTest extends IngestionTestBase
);
taskQueue = new TaskQueue(
new TaskLockConfig(),
new TaskQueueConfig(null, new Period(0L), null, null, null),
new TaskQueueConfig(null, new Period(0L), null, null, null, null),
new DefaultTaskConfig(),
getTaskStorage(),
taskRunner,

View File

@ -151,7 +151,7 @@ public class ConcurrentReplaceAndStreamingAppendTest extends IngestionTestBase
);
taskQueue = new TaskQueue(
new TaskLockConfig(),
new TaskQueueConfig(null, new Period(0L), null, null, null),
new TaskQueueConfig(null, new Period(0L), null, null, null, null),
new DefaultTaskConfig(),
getTaskStorage(),
taskRunner,

View File

@ -109,7 +109,7 @@ public class TaskLockConfigTest
} else {
lockConfig = new TaskLockConfig();
}
final TaskQueueConfig queueConfig = new TaskQueueConfig(null, null, null, null, null);
final TaskQueueConfig queueConfig = new TaskQueueConfig(null, null, null, null, null, null);
final TaskRunner taskRunner = EasyMock.createNiceMock(RemoteTaskRunner.class);
final TaskActionClientFactory actionClientFactory = EasyMock.createNiceMock(LocalTaskActionClientFactory.class);
final TaskLockbox lockbox = new TaskLockbox(taskStorage, new TestIndexerMetadataStorageCoordinator());

View File

@ -122,7 +122,7 @@ public class TaskQueueScaleTest
taskQueue = new TaskQueue(
new TaskLockConfig(),
new TaskQueueConfig(null, Period.millis(1), null, null, null),
new TaskQueueConfig(null, Period.millis(1), null, null, null, null),
new DefaultTaskConfig(),
taskStorage,
taskRunner,

View File

@ -71,6 +71,7 @@ import org.apache.druid.indexing.worker.TaskAnnouncement;
import org.apache.druid.indexing.worker.Worker;
import org.apache.druid.indexing.worker.config.WorkerConfig;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.HumanReadableBytes;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.granularity.Granularity;
@ -94,6 +95,7 @@ import org.junit.Test;
import javax.annotation.Nullable;
import java.io.IOException;
import java.net.URI;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@ -119,7 +121,7 @@ public class TaskQueueTest extends IngestionTestBase
taskQueue = new TaskQueue(
new TaskLockConfig(),
new TaskQueueConfig(3, null, null, null, null),
new TaskQueueConfig(3, null, null, null, null, null),
new DefaultTaskConfig()
{
@Override
@ -214,6 +216,73 @@ public class TaskQueueTest extends IngestionTestBase
);
}
@Test
public void testAddThrowsExceptionWhenPayloadIsTooLarge()
{
HumanReadableBytes maxPayloadSize10Mib = HumanReadableBytes.valueOf(10 * 1024 * 1024);
TaskQueue maxPayloadTaskQueue = new TaskQueue(
new TaskLockConfig(),
new TaskQueueConfig(3, null, null, null, null, maxPayloadSize10Mib),
new DefaultTaskConfig()
{
@Override
public Map<String, Object> getContext()
{
return defaultTaskContext;
}
},
getTaskStorage(),
new SimpleTaskRunner(),
actionClientFactory,
getLockbox(),
serviceEmitter,
getObjectMapper(),
new NoopTaskContextEnricher()
);
maxPayloadTaskQueue.setActive();
// 1 MB is not too large
char[] context = new char[1024 * 1024];
Arrays.fill(context, 'a');
maxPayloadTaskQueue.add(
new TestTask(
"tx",
Intervals.of("2021-01/P1M"),
ImmutableMap.of(
"contextKey", new String(context)
)
)
);
// 100 MB is too large
char[] contextLarge = new char[100 * 1024 * 1024];
Arrays.fill(contextLarge, 'a');
Assert.assertThrows(
DruidException.class,
() -> maxPayloadTaskQueue.add(
new TestTask(
"tx2",
Intervals.of("2021-01/P1M"),
ImmutableMap.of(
"contextKey", new String(contextLarge)
)
)
)
);
// If no limit is set, don't throw anything
taskQueue.add(
new TestTask(
"tx3",
Intervals.of("2021-01/P1M"),
ImmutableMap.of(
"contextKey", new String(contextLarge)
)
)
);
}
@Test
public void testAddedTaskUsesLineageBasedSegmentAllocationByDefault()
{
@ -336,7 +405,7 @@ public class TaskQueueTest extends IngestionTestBase
EasyMock.replay(workerHolder);
final TaskQueue taskQueue = new TaskQueue(
new TaskLockConfig(),
new TaskQueueConfig(null, null, null, null, null),
new TaskQueueConfig(null, null, null, null, null, null),
new DefaultTaskConfig(),
getTaskStorage(),
taskRunner,
@ -424,7 +493,7 @@ public class TaskQueueTest extends IngestionTestBase
final TaskQueue taskQueue = new TaskQueue(
new TaskLockConfig(),
new TaskQueueConfig(null, null, null, null, null),
new TaskQueueConfig(null, null, null, null, null, null),
new DefaultTaskConfig(),
taskStorage,
taskRunner,
@ -469,7 +538,7 @@ public class TaskQueueTest extends IngestionTestBase
final TaskQueue taskQueue = new TaskQueue(
new TaskLockConfig(),
new TaskQueueConfig(null, null, null, null, null),
new TaskQueueConfig(null, null, null, null, null, null),
new DefaultTaskConfig(),
taskStorage,
EasyMock.createMock(HttpRemoteTaskRunner.class),

View File

@ -236,7 +236,7 @@ public class OverlordTest
taskMaster = new TaskMaster(
new TaskLockConfig(),
new TaskQueueConfig(null, new Period(1), null, new Period(10), null),
new TaskQueueConfig(null, new Period(1), null, new Period(10), null, null),
new DefaultTaskConfig(),
taskLockbox,
taskStorage,