Surface lock revocation exceptions in task status (#16325)

This commit is contained in:
Arun Ramani 2024-04-25 20:09:44 -07:00 committed by GitHub
parent 4b6748bdc9
commit 126a0c219a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
12 changed files with 164 additions and 48 deletions

View File

@ -42,8 +42,6 @@ import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.common.task.AbstractTask; import org.apache.druid.indexing.common.task.AbstractTask;
import org.apache.druid.indexing.common.task.PendingSegmentAllocatingTask; import org.apache.druid.indexing.common.task.PendingSegmentAllocatingTask;
import org.apache.druid.indexing.common.task.Tasks; import org.apache.druid.indexing.common.task.Tasks;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.msq.exec.Controller; import org.apache.druid.msq.exec.Controller;
import org.apache.druid.msq.exec.ControllerContext; import org.apache.druid.msq.exec.ControllerContext;
@ -232,9 +230,8 @@ public class MSQControllerTask extends AbstractTask implements ClientTaskQuery,
if (taskLock == null) { if (taskLock == null) {
return false; return false;
} else if (taskLock.isRevoked()) {
throw new ISE(StringUtils.format("Lock for interval [%s] was revoked", interval));
} }
taskLock.assertNotRevoked();
} }
} }

View File

@ -20,6 +20,14 @@
package org.apache.druid.msq.indexing; package org.apache.druid.msq.indexing;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import org.apache.druid.error.DruidException;
import org.apache.druid.indexing.common.TaskLock;
import org.apache.druid.indexing.common.TaskLockType;
import org.apache.druid.indexing.common.TimeChunkLock;
import org.apache.druid.indexing.common.actions.TaskAction;
import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.indexing.common.actions.TimeChunkLockTryAcquireAction;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.msq.indexing.destination.DataSourceMSQDestination; import org.apache.druid.msq.indexing.destination.DataSourceMSQDestination;
@ -28,27 +36,31 @@ import org.apache.druid.query.scan.ScanQuery;
import org.apache.druid.query.spec.MultipleIntervalSegmentSpec; import org.apache.druid.query.spec.MultipleIntervalSegmentSpec;
import org.apache.druid.sql.calcite.planner.ColumnMapping; import org.apache.druid.sql.calcite.planner.ColumnMapping;
import org.apache.druid.sql.calcite.planner.ColumnMappings; import org.apache.druid.sql.calcite.planner.ColumnMappings;
import org.joda.time.Interval;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
import java.util.Collections; import java.util.Collections;
import java.util.List;
public class MSQControllerTaskTest public class MSQControllerTaskTest
{ {
private final List<Interval> INTERVALS =
Collections.singletonList(Intervals.of(
"2011-04-01T00:00:00.000Z/2011-04-03T00:00:00.000Z"));
private final MSQSpec MSQ_SPEC = MSQSpec private final MSQSpec MSQ_SPEC = MSQSpec
.builder() .builder()
.destination(new DataSourceMSQDestination( .destination(new DataSourceMSQDestination(
"target", "target",
Granularities.DAY, Granularities.DAY,
null, null,
null INTERVALS
)) ))
.query(new Druids.ScanQueryBuilder() .query(new Druids.ScanQueryBuilder()
.resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST) .resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
.legacy(false) .legacy(false)
.intervals(new MultipleIntervalSegmentSpec( .intervals(new MultipleIntervalSegmentSpec(INTERVALS))
Collections.singletonList(Intervals.of(
"2011-04-01T00:00:00.000Z/2011-04-03T00:00:00.000Z"))))
.dataSource("target") .dataSource("target")
.build() .build()
) )
@ -88,4 +100,84 @@ public class MSQControllerTaskTest
); );
Assert.assertEquals(taskId, controllerTask.getTaskAllocatorId()); Assert.assertEquals(taskId, controllerTask.getTaskAllocatorId());
} }
@Test
public void testIsReady() throws Exception
{
final String taskId = "taskId";
MSQControllerTask controllerTask = new MSQControllerTask(
taskId,
MSQ_SPEC,
null,
null,
null,
null,
null,
null
);
TestTaskActionClient taskActionClient = new TestTaskActionClient(
new TimeChunkLock(
TaskLockType.REPLACE,
"groupId",
"dataSource",
INTERVALS.get(0),
"0",
0
)
);
Assert.assertTrue(controllerTask.isReady(taskActionClient));
}
@Test
public void testIsReadyWithRevokedLock()
{
final String taskId = "taskId";
MSQControllerTask controllerTask = new MSQControllerTask(
taskId,
MSQ_SPEC,
null,
null,
null,
null,
null,
null
);
TestTaskActionClient taskActionClient = new TestTaskActionClient(
new TimeChunkLock(
TaskLockType.REPLACE,
"groupId",
"dataSource",
INTERVALS.get(0),
"0",
0,
true
)
);
DruidException exception = Assert.assertThrows(
DruidException.class,
() -> controllerTask.isReady(taskActionClient));
Assert.assertEquals(
"Lock of type[REPLACE] for interval[2011-04-01T00:00:00.000Z/2011-04-03T00:00:00.000Z] was revoked",
exception.getMessage());
}
private static class TestTaskActionClient implements TaskActionClient
{
private final TaskLock taskLock;
TestTaskActionClient(TaskLock taskLock)
{
this.taskLock = taskLock;
}
@SuppressWarnings("unchecked")
@Override
public <RetType> RetType submit(TaskAction<RetType> taskAction)
{
if (!(taskAction instanceof TimeChunkLockTryAcquireAction)) {
throw new ISE("action[%s] is not supported", taskAction);
}
return (RetType) taskLock;
}
}
} }

View File

@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonSubTypes.Type; import com.fasterxml.jackson.annotation.JsonSubTypes.Type;
import com.fasterxml.jackson.annotation.JsonTypeInfo; import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.fasterxml.jackson.annotation.JsonTypeInfo.Id; import com.fasterxml.jackson.annotation.JsonTypeInfo.Id;
import org.apache.druid.error.DruidException;
import org.apache.druid.indexing.overlord.LockRequest; import org.apache.druid.indexing.overlord.LockRequest;
import org.joda.time.Interval; import org.joda.time.Interval;
@ -62,4 +63,18 @@ public interface TaskLock
boolean isRevoked(); boolean isRevoked();
boolean conflict(LockRequest request); boolean conflict(LockRequest request);
/**
* Checks if the lock is revoked and throws a {@link DruidException} if so.
*
* @throws DruidException if the lock is revoked.
*/
default void assertNotRevoked()
{
if (isRevoked()) {
throw DruidException.forPersona(DruidException.Persona.OPERATOR)
.ofCategory(DruidException.Category.RUNTIME_FAILURE)
.build("Lock of type[%s] for interval[%s] was revoked", getType(), getInterval());
}
}
} }

View File

@ -59,7 +59,6 @@ import org.apache.druid.java.util.common.JodaUtils;
import org.apache.druid.java.util.common.NonnullPair; import org.apache.druid.java.util.common.NonnullPair;
import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.Stopwatch; import org.apache.druid.java.util.common.Stopwatch;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.granularity.Granularity; import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.common.granularity.GranularityType; import org.apache.druid.java.util.common.granularity.GranularityType;
@ -486,9 +485,7 @@ public abstract class AbstractBatchIndexTask extends AbstractTask
if (lock == null) { if (lock == null) {
return false; return false;
} }
if (lock.isRevoked()) { lock.assertNotRevoked();
throw new ISE(StringUtils.format("Lock for interval [%s] was revoked.", cur));
}
locksAcquired++; locksAcquired++;
intervalToLockVersion.put(cur, lock.getVersion()); intervalToLockVersion.put(cur, lock.getVersion());
} }
@ -829,9 +826,7 @@ public abstract class AbstractBatchIndexTask extends AbstractTask
"Cannot acquire a lock for interval[%s]", "Cannot acquire a lock for interval[%s]",
interval interval
); );
if (lock.isRevoked()) { lock.assertNotRevoked();
throw new ISE(StringUtils.format("Lock for interval [%s] was revoked.", interval));
}
version = lock.getVersion(); version = lock.getVersion();
} else { } else {
version = existingLockVersion; version = existingLockVersion;

View File

@ -30,7 +30,6 @@ import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.indexing.common.actions.TimeChunkLockTryAcquireAction; import org.apache.druid.indexing.common.actions.TimeChunkLockTryAcquireAction;
import org.apache.druid.indexing.common.config.TaskConfig; import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.joda.time.Interval; import org.joda.time.Interval;
import java.io.IOException; import java.io.IOException;
@ -88,9 +87,7 @@ public abstract class AbstractFixedIntervalTask extends AbstractTask
if (lock == null) { if (lock == null) {
return false; return false;
} }
if (lock.isRevoked()) { lock.assertNotRevoked();
throw new ISE(StringUtils.format("Lock for interval [%s] was revoked.", interval));
}
return true; return true;
} }

View File

@ -337,9 +337,7 @@ public class AppenderatorDriverRealtimeIndexTask extends AbstractTask
if (lock == null) { if (lock == null) {
return false; return false;
} }
if (lock.isRevoked()) { lock.assertNotRevoked();
throw new ISE(StringUtils.format("Lock for interval [%s] was revoked.", segmentId.getInterval()));
}
return true; return true;
} }
} }

View File

@ -52,7 +52,6 @@ import org.apache.druid.indexing.common.actions.TimeChunkLockAcquireAction;
import org.apache.druid.indexing.common.actions.TimeChunkLockTryAcquireAction; import org.apache.druid.indexing.common.actions.TimeChunkLockTryAcquireAction;
import org.apache.druid.indexing.common.config.TaskConfig; import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.hadoop.OverlordActionBasedUsedSegmentsRetriever; import org.apache.druid.indexing.hadoop.OverlordActionBasedUsedSegmentsRetriever;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.JodaUtils; import org.apache.druid.java.util.common.JodaUtils;
import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularity; import org.apache.druid.java.util.common.granularity.Granularity;
@ -221,9 +220,7 @@ public class HadoopIndexTask extends HadoopTask implements ChatHandler
if (lock == null) { if (lock == null) {
return false; return false;
} }
if (lock.isRevoked()) { lock.assertNotRevoked();
throw new ISE(StringUtils.format("Lock for interval [%s] was revoked.", interval));
}
return true; return true;
} else { } else {
return true; return true;
@ -420,9 +417,7 @@ public class HadoopIndexTask extends HadoopTask implements ChatHandler
), ),
"Cannot acquire a lock for interval[%s]", interval "Cannot acquire a lock for interval[%s]", interval
); );
if (lock.isRevoked()) { lock.assertNotRevoked();
throw new ISE(StringUtils.format("Lock for interval [%s] was revoked.", interval));
}
version = lock.getVersion(); version = lock.getVersion();
} else { } else {
Iterable<TaskLock> locks = getTaskLocks(toolbox.getTaskActionClient()); Iterable<TaskLock> locks = getTaskLocks(toolbox.getTaskActionClient());

View File

@ -42,7 +42,6 @@ import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.indexing.common.actions.TimeChunkLockAcquireAction; import org.apache.druid.indexing.common.actions.TimeChunkLockAcquireAction;
import org.apache.druid.indexing.common.config.TaskConfig; import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.UOE; import org.apache.druid.java.util.common.UOE;
import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.java.util.emitter.EmittingLogger;
@ -263,9 +262,7 @@ public class RealtimeIndexTask extends AbstractTask
"Cannot acquire a lock for interval[%s]", "Cannot acquire a lock for interval[%s]",
segment.getInterval() segment.getInterval()
); );
if (lock.isRevoked()) { lock.assertNotRevoked();
throw new ISE(StringUtils.format("Lock for interval [%s] was revoked.", segment.getInterval()));
}
toolbox.getSegmentAnnouncer().announceSegment(segment); toolbox.getSegmentAnnouncer().announceSegment(segment);
} }
@ -292,9 +289,7 @@ public class RealtimeIndexTask extends AbstractTask
"Cannot acquire a lock for interval[%s]", "Cannot acquire a lock for interval[%s]",
segment.getInterval() segment.getInterval()
); );
if (lock.isRevoked()) { lock.assertNotRevoked();
throw new ISE(StringUtils.format("Lock for interval [%s] was revoked.", segment.getInterval()));
}
} }
toolbox.getSegmentAnnouncer().announceSegments(segments); toolbox.getSegmentAnnouncer().announceSegments(segments);
} }
@ -346,9 +341,7 @@ public class RealtimeIndexTask extends AbstractTask
"Cannot acquire a lock for interval[%s]", "Cannot acquire a lock for interval[%s]",
interval interval
); );
if (lock.isRevoked()) { lock.assertNotRevoked();
throw new ISE(StringUtils.format("Lock for interval [%s] was revoked.", interval));
}
return lock.getVersion(); return lock.getVersion();
} }
catch (IOException e) { catch (IOException e) {

View File

@ -424,15 +424,16 @@ public class TaskQueue
catch (Exception e) { catch (Exception e) {
log.warn(e, "Exception thrown during isReady for task: %s", task.getId()); log.warn(e, "Exception thrown during isReady for task: %s", task.getId());
final String errorMessage; final String errorMessage;
if (e instanceof MaxAllowedLocksExceededException) { if (e instanceof MaxAllowedLocksExceededException || e instanceof DruidException) {
errorMessage = e.getMessage(); errorMessage = e.getMessage();
} else { } else {
errorMessage = StringUtils.format( errorMessage = StringUtils.format(
"Encountered error[%s] while waiting for task to be ready. See Overlord logs for more details.", "Encountered error[%s] while waiting for task to be ready. See Overlord logs for more details.",
StringUtils.chop(e.getMessage(), 100) e.getMessage()
); );
} }
notifyStatus(task, TaskStatus.failure(task.getId(), errorMessage), errorMessage); TaskStatus taskStatus = TaskStatus.failure(task.getId(), errorMessage);
notifyStatus(task, taskStatus, taskStatus.getErrorMsg());
continue; continue;
} }
if (taskIsReady) { if (taskIsReady) {

View File

@ -464,9 +464,7 @@ public abstract class SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
if (lock == null) { if (lock == null) {
return false; return false;
} }
if (lock.isRevoked()) { lock.assertNotRevoked();
throw new ISE(StringUtils.format("Lock for interval [%s] was revoked.", segmentId.getInterval()));
}
return true; return true;
} }
} }

View File

@ -37,7 +37,7 @@ import java.util.Objects;
*/ */
public class TaskStatus public class TaskStatus
{ {
public static final int MAX_ERROR_MSG_LENGTH = 100; public static final int MAX_ERROR_MSG_TRUNCATION_LIMIT = 1024;
public static TaskStatus running(String taskId) public static TaskStatus running(String taskId)
{ {
@ -88,8 +88,10 @@ public class TaskStatus
*/ */
private static @Nullable String truncateErrorMsg(@Nullable String errorMsg) private static @Nullable String truncateErrorMsg(@Nullable String errorMsg)
{ {
if (errorMsg != null && errorMsg.length() > MAX_ERROR_MSG_LENGTH) { if (errorMsg != null && errorMsg.length() > MAX_ERROR_MSG_TRUNCATION_LIMIT) {
return errorMsg.substring(0, MAX_ERROR_MSG_LENGTH) + "..."; return errorMsg.substring(0, MAX_ERROR_MSG_TRUNCATION_LIMIT / 2)
+ "..." + (errorMsg.length() - MAX_ERROR_MSG_TRUNCATION_LIMIT) + " characters omitted..."
+ errorMsg.substring(errorMsg.length() - MAX_ERROR_MSG_TRUNCATION_LIMIT / 2);
} else { } else {
return errorMsg; return errorMsg;
} }

View File

@ -27,6 +27,32 @@ import java.io.IOException;
public class TaskStatusTest public class TaskStatusTest
{ {
static final String STACK_TRACE =
"org.apache.druid.java.util.common.ISE: Lock for interval [2024-04-23T00:00:00.000Z/2024-04-24T00:00:00.000Z] was revoked.\n"
+ "\tat org.apache.druid.indexing.common.task.AbstractBatchIndexTask.tryTimeChunkLock(AbstractBatchIndexTask.java:465) ~[druid-indexing-service-2024.03.0-iap.jar:2024.03.0-iap]\n"
+ "\tat org.apache.druid.indexing.common.task.batch.parallel.PartialHashSegmentGenerateTask.isReady(PartialHashSegmentGenerateTask.java:152) ~[druid-indexing-service-2024.03.0-iap.jar:2024.03.0-iap]\n"
+ "\tat org.apache.druid.indexing.overlord.TaskQueue.manageInternalCritical(TaskQueue.java:420) [druid-indexing-service-2024.03.0-iap.jar:2024.03.0-iap]\n"
+ "\tat org.apache.druid.indexing.overlord.TaskQueue.manageInternal(TaskQueue.java:373) [druid-indexing-service-2024.03.0-iap.jar:2024.03.0-iap]\n"
+ "\tat org.apache.druid.indexing.overlord.TaskQueue.manage(TaskQueue.java:356) [druid-indexing-service-2024.03.0-iap.jar:2024.03.0-iap]\n"
+ "\tat org.apache.druid.indexing.overlord.TaskQueue.access$000(TaskQueue.java:91) [druid-indexing-service-2024.03.0-iap.jar:2024.03.0-iap]\n"
+ "\tat org.apache.druid.indexing.overlord.TaskQueue$1.run(TaskQueue.java:212) [druid-indexing-service-2024.03.0-iap.jar:2024.03.0-iap]\n"
+ "\tat java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) [?:?]\n"
+ "\tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?]\n"
+ "\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?]\n"
+ "\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:?]\n"
+ "\tat java.base/java.lang.Thread.run(Thread.java:829) [?:?]\n";
static final String EXPECTED_ERROR_MESSAGE =
"org.apache.druid.java.util.common.ISE: Lock for interval [2024-04-23T00:00:00.000Z/2024-04-24T00:00:00.000Z] was revoked.\n"
+ "\tat org.apache.druid.indexing.common.task.AbstractBatchIndexTask.tryTimeChunkLock(AbstractBatchIndexTask.java:465) ~[druid-indexing-service-2024.03.0-iap.jar:2024.03.0-iap]\n"
+ "\tat org.apache.druid.indexing.common.task.batch.parallel.PartialHashSegmentGenerateTask.isReady(PartialHashSegmentGenerateTask.java:152) ~[druid-indexing-service-2024.03.0-iap.jar:2024.03.0-iap]\n"
+ "\tat org.apache.druid.i...584 characters omitted...e$1.run(TaskQueue.java:212) [druid-indexing-service-2024.03.0-iap.jar:2024.03.0-iap]\n"
+ "\tat java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) [?:?]\n"
+ "\tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?]\n"
+ "\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?]\n"
+ "\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:?]\n"
+ "\tat java.base/java.lang.Thread.run(Thread.java:829) [?:?]\n";
@Test @Test
public void testSerde() throws IOException public void testSerde() throws IOException
{ {
@ -64,4 +90,11 @@ public class TaskStatusTest
Assert.assertEquals(success.getLocation().getPort(), 0); Assert.assertEquals(success.getLocation().getPort(), 0);
Assert.assertEquals(success.getLocation().getTlsPort(), 1); Assert.assertEquals(success.getLocation().getTlsPort(), 1);
} }
@Test
public void testTruncation()
{
final TaskStatus status = TaskStatus.failure("testId", STACK_TRACE);
Assert.assertEquals(status.getErrorMsg(), EXPECTED_ERROR_MESSAGE);
}
} }