diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java index 69762ba7190..3a06af69d1e 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java @@ -31,6 +31,7 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.errorprone.annotations.concurrent.GuardedBy; import org.apache.druid.annotations.SuppressFBWarnings; import org.apache.druid.common.utils.IdUtils; +import org.apache.druid.error.DruidException; import org.apache.druid.indexer.TaskLocation; import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexing.common.Counters; @@ -520,7 +521,18 @@ public class TaskQueue try { Preconditions.checkState(active, "Queue is not active!"); Preconditions.checkNotNull(task, "task"); - Preconditions.checkState(tasks.size() < config.getMaxSize(), "Too many tasks (max = %s)", config.getMaxSize()); + if (tasks.size() >= config.getMaxSize()) { + throw DruidException.forPersona(DruidException.Persona.ADMIN) + .ofCategory(DruidException.Category.CAPACITY_EXCEEDED) + .build( + StringUtils.format( + "Too many tasks are in the queue (Limit = %d), " + + "(Current active tasks = %d). Retry later or increase the druid.indexer.queue.maxSize", + config.getMaxSize(), + tasks.size() + ) + ); + } // If this throws with any sort of exception, including TaskExistsException, we don't want to // insert the task into our queue. So don't catch it. diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java index f9604492dd4..01af55eadb3 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java @@ -34,7 +34,8 @@ import org.apache.druid.audit.AuditManager; import org.apache.druid.client.indexing.ClientTaskQuery; import org.apache.druid.common.config.ConfigManager.SetResult; import org.apache.druid.common.config.JacksonConfigManager; -import org.apache.druid.common.exception.DruidException; +import org.apache.druid.error.DruidException; +import org.apache.druid.error.ErrorResponse; import org.apache.druid.indexer.RunnerTaskState; import org.apache.druid.indexer.TaskInfo; import org.apache.druid.indexer.TaskLocation; @@ -227,9 +228,15 @@ public class OverlordResource return Response.ok(ImmutableMap.of("task", task.getId())).build(); } catch (DruidException e) { + return Response + .status(e.getStatusCode()) + .entity(new ErrorResponse(e)) + .build(); + } + catch (org.apache.druid.common.exception.DruidException e) { return Response.status(e.getResponseCode()) - .entity(ImmutableMap.of("error", e.getMessage())) - .build(); + .entity(ImmutableMap.of("error", e.getMessage())) + .build(); } } ); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java index a175b239e44..a1a93e29cbf 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java @@ -28,6 +28,7 @@ import org.apache.curator.framework.CuratorFramework; import org.apache.druid.common.guava.DSuppliers; import org.apache.druid.discovery.DruidNodeDiscoveryProvider; import org.apache.druid.discovery.WorkerNodeService; +import org.apache.druid.error.DruidException; import org.apache.druid.indexer.RunnerTaskState; import org.apache.druid.indexer.TaskLocation; import org.apache.druid.indexer.TaskState; @@ -182,6 +183,31 @@ public class TaskQueueTest extends IngestionTestBase Assert.assertEquals("Shutdown Task test", statusOptional.get().getErrorMsg()); } + @Test(expected = DruidException.class) + public void testTaskErrorWhenExceptionIsThrownDueToQueueSize() + { + final TaskActionClientFactory actionClientFactory = createActionClientFactory(); + final TaskQueue taskQueue = new TaskQueue( + new TaskLockConfig(), + new TaskQueueConfig(1, null, null, null, null), + new DefaultTaskConfig(), + getTaskStorage(), + new SimpleTaskRunner(actionClientFactory), + actionClientFactory, + getLockbox(), + new NoopServiceEmitter() + ); + taskQueue.setActive(true); + + // Create a Task and add it to the TaskQueue + final TestTask task1 = new TestTask("t1", Intervals.of("2021-01/P1M")); + final TestTask task2 = new TestTask("t2", Intervals.of("2021-01/P1M")); + taskQueue.add(task1); + + // we will get exception here as taskQueue size is 1 druid.indexer.queue.maxSize is already 1 + taskQueue.add(task2); + } + @Test public void testSetUseLineageBasedSegmentAllocationByDefault() throws EntryExistsException { diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java index 7a587bb196e..23ff4038633 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java @@ -66,7 +66,6 @@ import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner; import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskTuningConfig; import org.apache.druid.indexing.seekablestream.SeekableStreamSequenceNumbers; import org.apache.druid.indexing.seekablestream.SeekableStreamStartSequenceNumbers; -import org.apache.druid.indexing.seekablestream.TestSeekableStreamDataSourceMetadata; import org.apache.druid.indexing.seekablestream.common.OrderedSequenceNumber; import org.apache.druid.indexing.seekablestream.common.RecordSupplier; import org.apache.druid.indexing.seekablestream.common.StreamException;