mirror of https://github.com/apache/druid.git
Add pre-check for heavy debug logs (#15706)
Co-authored-by: Kashif Faraz <kashif.faraz@gmail.com> Co-authored-by: Benedict Jin <asdf2014@apache.org>
This commit is contained in:
parent
7c42e87db9
commit
e0bce0ef90
|
@ -752,7 +752,7 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen
|
||||||
);
|
);
|
||||||
|
|
||||||
// This is for potential debugging in case we suspect bad estimation of cardinalities etc,
|
// This is for potential debugging in case we suspect bad estimation of cardinalities etc,
|
||||||
LOG.debug("intervalToNumShards: %s", intervalToNumShards.toString());
|
LOG.debug("intervalToNumShards: %s", intervalToNumShards);
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
intervalToNumShards = CollectionUtils.mapValues(
|
intervalToNumShards = CollectionUtils.mapValues(
|
||||||
|
|
|
@ -1155,14 +1155,13 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
|
||||||
Instant handleNoticeEndTime = Instant.now();
|
Instant handleNoticeEndTime = Instant.now();
|
||||||
Duration timeElapsed = Duration.between(handleNoticeStartTime, handleNoticeEndTime);
|
Duration timeElapsed = Duration.between(handleNoticeStartTime, handleNoticeEndTime);
|
||||||
String noticeType = notice.getType();
|
String noticeType = notice.getType();
|
||||||
|
if (log.isDebugEnabled()) {
|
||||||
log.debug(
|
log.debug(
|
||||||
"Handled notice [%s] from notices queue in [%d] ms, "
|
"Handled notice [%s] from notices queue in [%d] ms, "
|
||||||
+ "current notices queue size [%d] for datasource [%s]",
|
+ "current notices queue size [%d] for datasource [%s]",
|
||||||
noticeType,
|
noticeType, timeElapsed.toMillis(), getNoticesQueueSize(), dataSource
|
||||||
timeElapsed.toMillis(),
|
|
||||||
getNoticesQueueSize(),
|
|
||||||
dataSource
|
|
||||||
);
|
);
|
||||||
|
}
|
||||||
emitNoticeProcessTime(noticeType, timeElapsed.toMillis());
|
emitNoticeProcessTime(noticeType, timeElapsed.toMillis());
|
||||||
}
|
}
|
||||||
catch (Throwable e) {
|
catch (Throwable e) {
|
||||||
|
|
|
@ -149,7 +149,7 @@ public class LagBasedAutoScaler implements SupervisorTaskAutoScaler
|
||||||
long totalLags = lagStats.getTotalLag();
|
long totalLags = lagStats.getTotalLag();
|
||||||
lagMetricsQueue.offer(totalLags > 0 ? totalLags : 0L);
|
lagMetricsQueue.offer(totalLags > 0 ? totalLags : 0L);
|
||||||
}
|
}
|
||||||
log.debug("Current lags [%s] for dataSource [%s].", new ArrayList<>(lagMetricsQueue), dataSource);
|
log.debug("Current lags for dataSource[%s] are [%s].", dataSource, lagMetricsQueue);
|
||||||
} else {
|
} else {
|
||||||
log.warn("[%s] supervisor is suspended, skipping lag collection", dataSource);
|
log.warn("[%s] supervisor is suspended, skipping lag collection", dataSource);
|
||||||
}
|
}
|
||||||
|
|
|
@ -185,7 +185,9 @@ public class OverlordResourceTestClient
|
||||||
HttpMethod.GET,
|
HttpMethod.GET,
|
||||||
StringUtils.format("%s%s", getIndexerURL(), identifier)
|
StringUtils.format("%s%s", getIndexerURL(), identifier)
|
||||||
);
|
);
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
LOG.debug("Tasks %s response %s", identifier, response.getContent());
|
LOG.debug("Tasks %s response %s", identifier, response.getContent());
|
||||||
|
}
|
||||||
return jsonMapper.readValue(
|
return jsonMapper.readValue(
|
||||||
response.getContent(), new TypeReference<List<TaskResponseObject>>()
|
response.getContent(), new TypeReference<List<TaskResponseObject>>()
|
||||||
{
|
{
|
||||||
|
@ -204,7 +206,9 @@ public class OverlordResourceTestClient
|
||||||
HttpMethod.GET,
|
HttpMethod.GET,
|
||||||
StringUtils.format("%stask/%s", getIndexerURL(), StringUtils.urlEncode(taskId))
|
StringUtils.format("%stask/%s", getIndexerURL(), StringUtils.urlEncode(taskId))
|
||||||
);
|
);
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
LOG.debug("Task %s response %s", taskId, response.getContent());
|
LOG.debug("Task %s response %s", taskId, response.getContent());
|
||||||
|
}
|
||||||
return jsonMapper.readValue(
|
return jsonMapper.readValue(
|
||||||
response.getContent(), new TypeReference<TaskPayloadResponse>()
|
response.getContent(), new TypeReference<TaskPayloadResponse>()
|
||||||
{
|
{
|
||||||
|
|
|
@ -595,7 +595,6 @@ public class FrameProcessorExecutor
|
||||||
|
|
||||||
sb.append("; cancel=").append(finishedFuture.isCancelled() ? "y" : "n");
|
sb.append("; cancel=").append(finishedFuture.isCancelled() ? "y" : "n");
|
||||||
sb.append("; done=").append(finishedFuture.isDone() ? "y" : "n");
|
sb.append("; done=").append(finishedFuture.isDone() ? "y" : "n");
|
||||||
|
|
||||||
log.debug(StringUtils.encodeForFormat(sb.toString()));
|
log.debug(StringUtils.encodeForFormat(sb.toString()));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -475,20 +475,17 @@ public class ParallelMergeCombiningSequence<T> extends YieldingSequenceBase<T>
|
||||||
|
|
||||||
final int computedNumParallelTasks = Math.max(computedOptimalParallelism, 1);
|
final int computedNumParallelTasks = Math.max(computedOptimalParallelism, 1);
|
||||||
|
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
ForkJoinPool pool = getPool();
|
||||||
LOG.debug(
|
LOG.debug(
|
||||||
"Computed parallel tasks: [%s]; ForkJoinPool details - sequence parallelism: [%s] "
|
"Computed parallel tasks: [%s]; ForkJoinPool details - sequence parallelism: [%s] "
|
||||||
+ "active threads: [%s] running threads: [%s] queued submissions: [%s] queued tasks: [%s] "
|
+ "active threads: [%s] running threads: [%s] queued submissions: [%s] queued tasks: [%s] "
|
||||||
+ "pool parallelism: [%s] pool size: [%s] steal count: [%s]",
|
+ "pool parallelism: [%s] pool size: [%s] steal count: [%s]",
|
||||||
computedNumParallelTasks,
|
computedNumParallelTasks, parallelism,
|
||||||
parallelism,
|
pool.getActiveThreadCount(), runningThreadCount, submissionCount, pool.getQueuedTaskCount(),
|
||||||
getPool().getActiveThreadCount(),
|
pool.getParallelism(), pool.getPoolSize(), pool.getStealCount()
|
||||||
runningThreadCount,
|
|
||||||
submissionCount,
|
|
||||||
getPool().getQueuedTaskCount(),
|
|
||||||
getPool().getParallelism(),
|
|
||||||
getPool().getPoolSize(),
|
|
||||||
getPool().getStealCount()
|
|
||||||
);
|
);
|
||||||
|
}
|
||||||
|
|
||||||
return computedNumParallelTasks;
|
return computedNumParallelTasks;
|
||||||
}
|
}
|
||||||
|
|
|
@ -490,11 +490,9 @@ public class FileSmoosher implements Closeable
|
||||||
public void close() throws IOException
|
public void close() throws IOException
|
||||||
{
|
{
|
||||||
closer.close();
|
closer.close();
|
||||||
FileSmoosher.LOG.debug(
|
if (LOG.isDebugEnabled()) {
|
||||||
"Created smoosh file [%s] of size [%s] bytes.",
|
LOG.debug("Created smoosh file [%s] of size [%s] bytes.", outFile.getAbsolutePath(), outFile.length());
|
||||||
outFile.getAbsolutePath(),
|
}
|
||||||
outFile.length()
|
|
||||||
);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -91,7 +91,9 @@ public class DataServerClient
|
||||||
requestBuilder = requestBuilder.jsonContent(objectMapper, query);
|
requestBuilder = requestBuilder.jsonContent(objectMapper, query);
|
||||||
}
|
}
|
||||||
|
|
||||||
log.debug("Sending request to servers for query[%s], request[%s]", query.getId(), requestBuilder.toString());
|
if (log.isDebugEnabled()) {
|
||||||
|
log.debug("Sending request to servers for query[%s], request[%s]", query.getId(), requestBuilder);
|
||||||
|
}
|
||||||
ListenableFuture<InputStream> resultStreamFuture = serviceClient.asyncRequest(
|
ListenableFuture<InputStream> resultStreamFuture = serviceClient.asyncRequest(
|
||||||
requestBuilder,
|
requestBuilder,
|
||||||
new DataServerResponseHandler(query, responseContext, objectMapper)
|
new DataServerResponseHandler(query, responseContext, objectMapper)
|
||||||
|
|
|
@ -538,10 +538,9 @@ public class DruidCoordinator
|
||||||
if (getCompactSegmentsDutyFromCustomGroups().isEmpty()) {
|
if (getCompactSegmentsDutyFromCustomGroups().isEmpty()) {
|
||||||
duties.add(compactSegments);
|
duties.add(compactSegments);
|
||||||
}
|
}
|
||||||
log.debug(
|
if (log.isDebugEnabled()) {
|
||||||
"Initialized indexing service duties [%s].",
|
log.debug("Initialized indexing service duties [%s].", duties.stream().map(duty -> duty.getClass().getName()).collect(Collectors.toList()));
|
||||||
duties.stream().map(duty -> duty.getClass().getName()).collect(Collectors.toList())
|
}
|
||||||
);
|
|
||||||
return ImmutableList.copyOf(duties);
|
return ImmutableList.copyOf(duties);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -230,12 +230,12 @@ public class CuratorLoadQueuePeon implements LoadQueuePeon
|
||||||
final String path = ZKPaths.makePath(basePath, segmentHolder.getSegmentIdentifier());
|
final String path = ZKPaths.makePath(basePath, segmentHolder.getSegmentIdentifier());
|
||||||
final byte[] payload = jsonMapper.writeValueAsBytes(segmentHolder.getChangeRequest());
|
final byte[] payload = jsonMapper.writeValueAsBytes(segmentHolder.getChangeRequest());
|
||||||
curator.create().withMode(CreateMode.EPHEMERAL).forPath(path, payload);
|
curator.create().withMode(CreateMode.EPHEMERAL).forPath(path, payload);
|
||||||
|
if (log.isDebugEnabled()) {
|
||||||
log.debug(
|
log.debug(
|
||||||
"ZKNode created for server to [%s] %s [%s]",
|
"ZKNode created for server to [%s] %s [%s]",
|
||||||
basePath,
|
basePath, segmentHolder.getAction(), segmentHolder.getSegmentIdentifier()
|
||||||
segmentHolder.getAction(),
|
|
||||||
segmentHolder.getSegmentIdentifier()
|
|
||||||
);
|
);
|
||||||
|
}
|
||||||
final ScheduledFuture<?> nodeDeletedCheck = scheduleNodeDeletedCheck(path);
|
final ScheduledFuture<?> nodeDeletedCheck = scheduleNodeDeletedCheck(path);
|
||||||
final Stat stat = curator.checkExists().usingWatcher(
|
final Stat stat = curator.checkExists().usingWatcher(
|
||||||
(CuratorWatcher) watchedEvent -> {
|
(CuratorWatcher) watchedEvent -> {
|
||||||
|
|
Loading…
Reference in New Issue