diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQControllerTask.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQControllerTask.java index 7bb57c5e1c2..9cc9e4dae74 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQControllerTask.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQControllerTask.java @@ -42,8 +42,6 @@ import org.apache.druid.indexing.common.config.TaskConfig; import org.apache.druid.indexing.common.task.AbstractTask; import org.apache.druid.indexing.common.task.PendingSegmentAllocatingTask; import org.apache.druid.indexing.common.task.Tasks; -import org.apache.druid.java.util.common.ISE; -import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.msq.exec.Controller; import org.apache.druid.msq.exec.ControllerContext; @@ -232,9 +230,8 @@ public class MSQControllerTask extends AbstractTask implements ClientTaskQuery, if (taskLock == null) { return false; - } else if (taskLock.isRevoked()) { - throw new ISE(StringUtils.format("Lock for interval [%s] was revoked", interval)); } + taskLock.assertNotRevoked(); } } diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQControllerTaskTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQControllerTaskTest.java index 309eb3830ac..e0eee251f72 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQControllerTaskTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQControllerTaskTest.java @@ -20,6 +20,14 @@ package org.apache.druid.msq.indexing; import com.google.common.collect.ImmutableList; +import org.apache.druid.error.DruidException; +import org.apache.druid.indexing.common.TaskLock; +import org.apache.druid.indexing.common.TaskLockType; +import org.apache.druid.indexing.common.TimeChunkLock; +import org.apache.druid.indexing.common.actions.TaskAction; +import org.apache.druid.indexing.common.actions.TaskActionClient; +import org.apache.druid.indexing.common.actions.TimeChunkLockTryAcquireAction; +import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.msq.indexing.destination.DataSourceMSQDestination; @@ -28,27 +36,31 @@ import org.apache.druid.query.scan.ScanQuery; import org.apache.druid.query.spec.MultipleIntervalSegmentSpec; import org.apache.druid.sql.calcite.planner.ColumnMapping; import org.apache.druid.sql.calcite.planner.ColumnMappings; +import org.joda.time.Interval; import org.junit.Assert; import org.junit.Test; import java.util.Collections; +import java.util.List; public class MSQControllerTaskTest { + private final List INTERVALS = + Collections.singletonList(Intervals.of( + "2011-04-01T00:00:00.000Z/2011-04-03T00:00:00.000Z")); + private final MSQSpec MSQ_SPEC = MSQSpec .builder() .destination(new DataSourceMSQDestination( "target", Granularities.DAY, null, - null + INTERVALS )) .query(new Druids.ScanQueryBuilder() .resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST) .legacy(false) - .intervals(new MultipleIntervalSegmentSpec( - Collections.singletonList(Intervals.of( - "2011-04-01T00:00:00.000Z/2011-04-03T00:00:00.000Z")))) + .intervals(new MultipleIntervalSegmentSpec(INTERVALS)) .dataSource("target") .build() ) @@ -88,4 +100,84 @@ public class MSQControllerTaskTest ); Assert.assertEquals(taskId, controllerTask.getTaskAllocatorId()); } + + @Test + public void testIsReady() throws Exception + { + final String taskId = "taskId"; + MSQControllerTask controllerTask = new MSQControllerTask( + taskId, + MSQ_SPEC, + null, + null, + null, + null, + null, + null + ); + TestTaskActionClient taskActionClient = new TestTaskActionClient( + new TimeChunkLock( + TaskLockType.REPLACE, + "groupId", + "dataSource", + INTERVALS.get(0), + "0", + 0 + ) + ); + Assert.assertTrue(controllerTask.isReady(taskActionClient)); + } + + @Test + public void testIsReadyWithRevokedLock() + { + final String taskId = "taskId"; + MSQControllerTask controllerTask = new MSQControllerTask( + taskId, + MSQ_SPEC, + null, + null, + null, + null, + null, + null + ); + TestTaskActionClient taskActionClient = new TestTaskActionClient( + new TimeChunkLock( + TaskLockType.REPLACE, + "groupId", + "dataSource", + INTERVALS.get(0), + "0", + 0, + true + ) + ); + DruidException exception = Assert.assertThrows( + DruidException.class, + () -> controllerTask.isReady(taskActionClient)); + Assert.assertEquals( + "Lock of type[REPLACE] for interval[2011-04-01T00:00:00.000Z/2011-04-03T00:00:00.000Z] was revoked", + exception.getMessage()); + } + + private static class TestTaskActionClient implements TaskActionClient + { + private final TaskLock taskLock; + + TestTaskActionClient(TaskLock taskLock) + { + this.taskLock = taskLock; + } + + @SuppressWarnings("unchecked") + @Override + public RetType submit(TaskAction taskAction) + { + if (!(taskAction instanceof TimeChunkLockTryAcquireAction)) { + throw new ISE("action[%s] is not supported", taskAction); + } + return (RetType) taskLock; + } + } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskLock.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskLock.java index 8c1e5d25603..eb96eb4fed9 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskLock.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskLock.java @@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JsonSubTypes; import com.fasterxml.jackson.annotation.JsonSubTypes.Type; import com.fasterxml.jackson.annotation.JsonTypeInfo; import com.fasterxml.jackson.annotation.JsonTypeInfo.Id; +import org.apache.druid.error.DruidException; import org.apache.druid.indexing.overlord.LockRequest; import org.joda.time.Interval; @@ -62,4 +63,18 @@ public interface TaskLock boolean isRevoked(); boolean conflict(LockRequest request); + + /** + * Checks if the lock is revoked and throws a {@link DruidException} if so. + * + * @throws DruidException if the lock is revoked. + */ + default void assertNotRevoked() + { + if (isRevoked()) { + throw DruidException.forPersona(DruidException.Persona.OPERATOR) + .ofCategory(DruidException.Category.RUNTIME_FAILURE) + .build("Lock of type[%s] for interval[%s] was revoked", getType(), getInterval()); + } + } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java index 53daa6cc5e9..9fe3b78ee2d 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java @@ -59,7 +59,6 @@ import org.apache.druid.java.util.common.JodaUtils; import org.apache.druid.java.util.common.NonnullPair; import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.common.Stopwatch; -import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.common.granularity.Granularity; import org.apache.druid.java.util.common.granularity.GranularityType; @@ -486,9 +485,7 @@ public abstract class AbstractBatchIndexTask extends AbstractTask if (lock == null) { return false; } - if (lock.isRevoked()) { - throw new ISE(StringUtils.format("Lock for interval [%s] was revoked.", cur)); - } + lock.assertNotRevoked(); locksAcquired++; intervalToLockVersion.put(cur, lock.getVersion()); } @@ -829,9 +826,7 @@ public abstract class AbstractBatchIndexTask extends AbstractTask "Cannot acquire a lock for interval[%s]", interval ); - if (lock.isRevoked()) { - throw new ISE(StringUtils.format("Lock for interval [%s] was revoked.", interval)); - } + lock.assertNotRevoked(); version = lock.getVersion(); } else { version = existingLockVersion; diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractFixedIntervalTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractFixedIntervalTask.java index 2b527c8c39b..0c2a6ca04ec 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractFixedIntervalTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractFixedIntervalTask.java @@ -30,7 +30,6 @@ import org.apache.druid.indexing.common.actions.TaskActionClient; import org.apache.druid.indexing.common.actions.TimeChunkLockTryAcquireAction; import org.apache.druid.indexing.common.config.TaskConfig; import org.apache.druid.java.util.common.ISE; -import org.apache.druid.java.util.common.StringUtils; import org.joda.time.Interval; import java.io.IOException; @@ -88,9 +87,7 @@ public abstract class AbstractFixedIntervalTask extends AbstractTask if (lock == null) { return false; } - if (lock.isRevoked()) { - throw new ISE(StringUtils.format("Lock for interval [%s] was revoked.", interval)); - } + lock.assertNotRevoked(); return true; } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java index 81abc86e954..1230711e57b 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java @@ -337,9 +337,7 @@ public class AppenderatorDriverRealtimeIndexTask extends AbstractTask if (lock == null) { return false; } - if (lock.isRevoked()) { - throw new ISE(StringUtils.format("Lock for interval [%s] was revoked.", segmentId.getInterval())); - } + lock.assertNotRevoked(); return true; } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java index 21b17830668..4a31be24dd2 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java @@ -52,7 +52,6 @@ import org.apache.druid.indexing.common.actions.TimeChunkLockAcquireAction; import org.apache.druid.indexing.common.actions.TimeChunkLockTryAcquireAction; import org.apache.druid.indexing.common.config.TaskConfig; import org.apache.druid.indexing.hadoop.OverlordActionBasedUsedSegmentsRetriever; -import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.JodaUtils; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.granularity.Granularity; @@ -221,9 +220,7 @@ public class HadoopIndexTask extends HadoopTask implements ChatHandler if (lock == null) { return false; } - if (lock.isRevoked()) { - throw new ISE(StringUtils.format("Lock for interval [%s] was revoked.", interval)); - } + lock.assertNotRevoked(); return true; } else { return true; @@ -420,9 +417,7 @@ public class HadoopIndexTask extends HadoopTask implements ChatHandler ), "Cannot acquire a lock for interval[%s]", interval ); - if (lock.isRevoked()) { - throw new ISE(StringUtils.format("Lock for interval [%s] was revoked.", interval)); - } + lock.assertNotRevoked(); version = lock.getVersion(); } else { Iterable locks = getTaskLocks(toolbox.getTaskActionClient()); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/RealtimeIndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/RealtimeIndexTask.java index 9ffebac1340..a32ef91192b 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/RealtimeIndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/RealtimeIndexTask.java @@ -42,7 +42,6 @@ import org.apache.druid.indexing.common.actions.TaskActionClient; import org.apache.druid.indexing.common.actions.TimeChunkLockAcquireAction; import org.apache.druid.indexing.common.config.TaskConfig; import org.apache.druid.java.util.common.DateTimes; -import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.UOE; import org.apache.druid.java.util.emitter.EmittingLogger; @@ -263,9 +262,7 @@ public class RealtimeIndexTask extends AbstractTask "Cannot acquire a lock for interval[%s]", segment.getInterval() ); - if (lock.isRevoked()) { - throw new ISE(StringUtils.format("Lock for interval [%s] was revoked.", segment.getInterval())); - } + lock.assertNotRevoked(); toolbox.getSegmentAnnouncer().announceSegment(segment); } @@ -292,9 +289,7 @@ public class RealtimeIndexTask extends AbstractTask "Cannot acquire a lock for interval[%s]", segment.getInterval() ); - if (lock.isRevoked()) { - throw new ISE(StringUtils.format("Lock for interval [%s] was revoked.", segment.getInterval())); - } + lock.assertNotRevoked(); } toolbox.getSegmentAnnouncer().announceSegments(segments); } @@ -346,9 +341,7 @@ public class RealtimeIndexTask extends AbstractTask "Cannot acquire a lock for interval[%s]", interval ); - if (lock.isRevoked()) { - throw new ISE(StringUtils.format("Lock for interval [%s] was revoked.", interval)); - } + lock.assertNotRevoked(); return lock.getVersion(); } catch (IOException e) { diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java index 70a8869ee52..6036fe630ba 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java @@ -424,15 +424,16 @@ public class TaskQueue catch (Exception e) { log.warn(e, "Exception thrown during isReady for task: %s", task.getId()); final String errorMessage; - if (e instanceof MaxAllowedLocksExceededException) { + if (e instanceof MaxAllowedLocksExceededException || e instanceof DruidException) { errorMessage = e.getMessage(); } else { errorMessage = StringUtils.format( "Encountered error[%s] while waiting for task to be ready. See Overlord logs for more details.", - StringUtils.chop(e.getMessage(), 100) + e.getMessage() ); } - notifyStatus(task, TaskStatus.failure(task.getId(), errorMessage), errorMessage); + TaskStatus taskStatus = TaskStatus.failure(task.getId(), errorMessage); + notifyStatus(task, taskStatus, taskStatus.getErrorMsg()); continue; } if (taskIsReady) { diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java index 94ce367fc84..b5eefcdbc96 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java @@ -464,9 +464,7 @@ public abstract class SeekableStreamIndexTaskRunner MAX_ERROR_MSG_LENGTH) { - return errorMsg.substring(0, MAX_ERROR_MSG_LENGTH) + "..."; + if (errorMsg != null && errorMsg.length() > MAX_ERROR_MSG_TRUNCATION_LIMIT) { + return errorMsg.substring(0, MAX_ERROR_MSG_TRUNCATION_LIMIT / 2) + + "..." + (errorMsg.length() - MAX_ERROR_MSG_TRUNCATION_LIMIT) + " characters omitted..." + + errorMsg.substring(errorMsg.length() - MAX_ERROR_MSG_TRUNCATION_LIMIT / 2); } else { return errorMsg; } diff --git a/processing/src/test/java/org/apache/druid/indexer/TaskStatusTest.java b/processing/src/test/java/org/apache/druid/indexer/TaskStatusTest.java index d0cf6b3d2cb..a8a320de412 100644 --- a/processing/src/test/java/org/apache/druid/indexer/TaskStatusTest.java +++ b/processing/src/test/java/org/apache/druid/indexer/TaskStatusTest.java @@ -27,6 +27,32 @@ import java.io.IOException; public class TaskStatusTest { + static final String STACK_TRACE = + "org.apache.druid.java.util.common.ISE: Lock for interval [2024-04-23T00:00:00.000Z/2024-04-24T00:00:00.000Z] was revoked.\n" + + "\tat org.apache.druid.indexing.common.task.AbstractBatchIndexTask.tryTimeChunkLock(AbstractBatchIndexTask.java:465) ~[druid-indexing-service-2024.03.0-iap.jar:2024.03.0-iap]\n" + + "\tat org.apache.druid.indexing.common.task.batch.parallel.PartialHashSegmentGenerateTask.isReady(PartialHashSegmentGenerateTask.java:152) ~[druid-indexing-service-2024.03.0-iap.jar:2024.03.0-iap]\n" + + "\tat org.apache.druid.indexing.overlord.TaskQueue.manageInternalCritical(TaskQueue.java:420) [druid-indexing-service-2024.03.0-iap.jar:2024.03.0-iap]\n" + + "\tat org.apache.druid.indexing.overlord.TaskQueue.manageInternal(TaskQueue.java:373) [druid-indexing-service-2024.03.0-iap.jar:2024.03.0-iap]\n" + + "\tat org.apache.druid.indexing.overlord.TaskQueue.manage(TaskQueue.java:356) [druid-indexing-service-2024.03.0-iap.jar:2024.03.0-iap]\n" + + "\tat org.apache.druid.indexing.overlord.TaskQueue.access$000(TaskQueue.java:91) [druid-indexing-service-2024.03.0-iap.jar:2024.03.0-iap]\n" + + "\tat org.apache.druid.indexing.overlord.TaskQueue$1.run(TaskQueue.java:212) [druid-indexing-service-2024.03.0-iap.jar:2024.03.0-iap]\n" + + "\tat java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) [?:?]\n" + + "\tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?]\n" + + "\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?]\n" + + "\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:?]\n" + + "\tat java.base/java.lang.Thread.run(Thread.java:829) [?:?]\n"; + + static final String EXPECTED_ERROR_MESSAGE = + "org.apache.druid.java.util.common.ISE: Lock for interval [2024-04-23T00:00:00.000Z/2024-04-24T00:00:00.000Z] was revoked.\n" + + "\tat org.apache.druid.indexing.common.task.AbstractBatchIndexTask.tryTimeChunkLock(AbstractBatchIndexTask.java:465) ~[druid-indexing-service-2024.03.0-iap.jar:2024.03.0-iap]\n" + + "\tat org.apache.druid.indexing.common.task.batch.parallel.PartialHashSegmentGenerateTask.isReady(PartialHashSegmentGenerateTask.java:152) ~[druid-indexing-service-2024.03.0-iap.jar:2024.03.0-iap]\n" + + "\tat org.apache.druid.i...584 characters omitted...e$1.run(TaskQueue.java:212) [druid-indexing-service-2024.03.0-iap.jar:2024.03.0-iap]\n" + + "\tat java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) [?:?]\n" + + "\tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?]\n" + + "\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?]\n" + + "\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:?]\n" + + "\tat java.base/java.lang.Thread.run(Thread.java:829) [?:?]\n"; + @Test public void testSerde() throws IOException { @@ -64,4 +90,11 @@ public class TaskStatusTest Assert.assertEquals(success.getLocation().getPort(), 0); Assert.assertEquals(success.getLocation().getTlsPort(), 1); } + + @Test + public void testTruncation() + { + final TaskStatus status = TaskStatus.failure("testId", STACK_TRACE); + Assert.assertEquals(status.getErrorMsg(), EXPECTED_ERROR_MESSAGE); + } }