Returning correct Response Code HTTP 429 when taskQueue reached maxSize (#15409)

Currently when we submit a task to druid and number of currently active tasks has already reached (druid.indexer.queue.maxSize) then 500 ISE is thrown as per shown in the screenshot in #15380.

This fix will return HTTP 429 Too Many Requests(with proper error message) instead of 500 ISE, when we submit a task and queueSize has reached.
This commit is contained in:
Sachidananda Maharana 2023-11-22 14:27:20 +05:30 committed by GitHub
parent dff5bcb0a6
commit 2f269fe065
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 49 additions and 5 deletions

View File

@ -31,6 +31,7 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import org.apache.druid.annotations.SuppressFBWarnings;
import org.apache.druid.common.utils.IdUtils;
import org.apache.druid.error.DruidException;
import org.apache.druid.indexer.TaskLocation;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.Counters;
@ -520,7 +521,18 @@ public class TaskQueue
try {
Preconditions.checkState(active, "Queue is not active!");
Preconditions.checkNotNull(task, "task");
Preconditions.checkState(tasks.size() < config.getMaxSize(), "Too many tasks (max = %s)", config.getMaxSize());
if (tasks.size() >= config.getMaxSize()) {
throw DruidException.forPersona(DruidException.Persona.ADMIN)
.ofCategory(DruidException.Category.CAPACITY_EXCEEDED)
.build(
StringUtils.format(
"Too many tasks are in the queue (Limit = %d), " +
"(Current active tasks = %d). Retry later or increase the druid.indexer.queue.maxSize",
config.getMaxSize(),
tasks.size()
)
);
}
// If this throws with any sort of exception, including TaskExistsException, we don't want to
// insert the task into our queue. So don't catch it.

View File

@ -34,7 +34,8 @@ import org.apache.druid.audit.AuditManager;
import org.apache.druid.client.indexing.ClientTaskQuery;
import org.apache.druid.common.config.ConfigManager.SetResult;
import org.apache.druid.common.config.JacksonConfigManager;
import org.apache.druid.common.exception.DruidException;
import org.apache.druid.error.DruidException;
import org.apache.druid.error.ErrorResponse;
import org.apache.druid.indexer.RunnerTaskState;
import org.apache.druid.indexer.TaskInfo;
import org.apache.druid.indexer.TaskLocation;
@ -227,9 +228,15 @@ public class OverlordResource
return Response.ok(ImmutableMap.of("task", task.getId())).build();
}
catch (DruidException e) {
return Response
.status(e.getStatusCode())
.entity(new ErrorResponse(e))
.build();
}
catch (org.apache.druid.common.exception.DruidException e) {
return Response.status(e.getResponseCode())
.entity(ImmutableMap.of("error", e.getMessage()))
.build();
.entity(ImmutableMap.of("error", e.getMessage()))
.build();
}
}
);

View File

@ -28,6 +28,7 @@ import org.apache.curator.framework.CuratorFramework;
import org.apache.druid.common.guava.DSuppliers;
import org.apache.druid.discovery.DruidNodeDiscoveryProvider;
import org.apache.druid.discovery.WorkerNodeService;
import org.apache.druid.error.DruidException;
import org.apache.druid.indexer.RunnerTaskState;
import org.apache.druid.indexer.TaskLocation;
import org.apache.druid.indexer.TaskState;
@ -182,6 +183,31 @@ public class TaskQueueTest extends IngestionTestBase
Assert.assertEquals("Shutdown Task test", statusOptional.get().getErrorMsg());
}
@Test(expected = DruidException.class)
public void testTaskErrorWhenExceptionIsThrownDueToQueueSize()
{
final TaskActionClientFactory actionClientFactory = createActionClientFactory();
final TaskQueue taskQueue = new TaskQueue(
new TaskLockConfig(),
new TaskQueueConfig(1, null, null, null, null),
new DefaultTaskConfig(),
getTaskStorage(),
new SimpleTaskRunner(actionClientFactory),
actionClientFactory,
getLockbox(),
new NoopServiceEmitter()
);
taskQueue.setActive(true);
// Create a Task and add it to the TaskQueue
final TestTask task1 = new TestTask("t1", Intervals.of("2021-01/P1M"));
final TestTask task2 = new TestTask("t2", Intervals.of("2021-01/P1M"));
taskQueue.add(task1);
// we will get exception here as taskQueue size is 1 druid.indexer.queue.maxSize is already 1
taskQueue.add(task2);
}
@Test
public void testSetUseLineageBasedSegmentAllocationByDefault() throws EntryExistsException
{

View File

@ -66,7 +66,6 @@ import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskTuningConfig;
import org.apache.druid.indexing.seekablestream.SeekableStreamSequenceNumbers;
import org.apache.druid.indexing.seekablestream.SeekableStreamStartSequenceNumbers;
import org.apache.druid.indexing.seekablestream.TestSeekableStreamDataSourceMetadata;
import org.apache.druid.indexing.seekablestream.common.OrderedSequenceNumber;
import org.apache.druid.indexing.seekablestream.common.RecordSupplier;
import org.apache.druid.indexing.seekablestream.common.StreamException;