mirror of https://github.com/apache/druid.git
MSQ: Include stageId, workerNumber in processing thread names. (#17324)
* MSQ: Include stageId, workerNumber in processing thread names. Helps identify which query was running in a thread dump. * s/dart/msq/
This commit is contained in:
parent
a0c29f8bbb
commit
b287b219a8
|
@ -969,10 +969,11 @@ public class WorkerImpl implements Worker
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns cancellation ID for a particular stage, to be used in {@link FrameProcessorExecutor#cancel(String)}.
|
* Returns cancellation ID for a particular stage, to be used in {@link FrameProcessorExecutor#cancel(String)}.
|
||||||
|
* In addition to being a token for cancellation, this also appears in thread dumps, so make it a little descriptive.
|
||||||
*/
|
*/
|
||||||
private static String cancellationIdFor(final StageId stageId, final int workerNumber)
|
private static String cancellationIdFor(final StageId stageId, final int workerNumber)
|
||||||
{
|
{
|
||||||
return StringUtils.format("%s_%s", stageId, workerNumber);
|
return StringUtils.format("msq-worker[%s_%s]", stageId, workerNumber);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -222,6 +222,7 @@ public class FrameProcessorExecutor
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
final String threadName = Thread.currentThread().getName();
|
||||||
boolean canceled = false;
|
boolean canceled = false;
|
||||||
Either<Throwable, ReturnOrAwait<T>> retVal;
|
Either<Throwable, ReturnOrAwait<T>> retVal;
|
||||||
|
|
||||||
|
@ -230,6 +231,11 @@ public class FrameProcessorExecutor
|
||||||
throw new InterruptedException();
|
throw new InterruptedException();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (cancellationId != null) {
|
||||||
|
// Set the thread name to something involving the cancellationId, to make thread dumps more useful.
|
||||||
|
Thread.currentThread().setName(threadName + "-" + cancellationId);
|
||||||
|
}
|
||||||
|
|
||||||
retVal = Either.value(processor.runIncrementally(readableInputs));
|
retVal = Either.value(processor.runIncrementally(readableInputs));
|
||||||
}
|
}
|
||||||
catch (Throwable e) {
|
catch (Throwable e) {
|
||||||
|
@ -253,6 +259,9 @@ public class FrameProcessorExecutor
|
||||||
canceled = true;
|
canceled = true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Restore original thread name.
|
||||||
|
Thread.currentThread().setName(threadName);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue