Adding more debug logs to increase visibility into StreamSupervisor notices queue size and processing time. (#11415)

This commit is contained in:
Harini Rajendran 2021-07-08 18:15:15 -05:00 committed by GitHub
parent 63fcd77c38
commit 4c90c0c21d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 12 additions and 5 deletions

View File

@ -93,6 +93,8 @@ import org.joda.time.DateTime;
import javax.annotation.Nullable;
import javax.validation.constraints.NotNull;
import java.io.IOException;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@ -825,10 +827,10 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
synchronized (stopLock) {
if (stopGracefully) {
log.info("Posting GracefulShutdownNotice, signalling managed tasks to complete and publish");
notices.add(new GracefulShutdownNotice());
addNotice(new GracefulShutdownNotice());
} else {
log.info("Posting ShutdownNotice");
notices.add(new ShutdownNotice());
addNotice(new ShutdownNotice());
}
long shutdownTimeoutMillis = tuningConfig.getShutdownTimeout().getMillis();
@ -865,7 +867,7 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
public void reset(DataSourceMetadata dataSourceMetadata)
{
log.info("Posting ResetNotice");
notices.add(new ResetNotice(dataSourceMetadata));
addNotice(new ResetNotice(dataSourceMetadata));
}
public ReentrantLock getRecordSupplierLock()
@ -902,7 +904,11 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
}
try {
Instant handleNoticeStartTime = Instant.now();
notice.handle();
Instant handleNoticeEndTime = Instant.now();
Duration timeElapsed = Duration.between(handleNoticeStartTime, handleNoticeEndTime);
log.debug("Handled notice [%s] from notices queue in [%d] ms, current notices queue size [%d]", notice.getClass().getName(), timeElapsed.toMillis(), getNoticesQueueSize());
}
catch (Throwable e) {
stateManager.recordThrowableEvent(e);
@ -956,7 +962,7 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
private Runnable buildRunTask()
{
return () -> notices.add(new RunNotice());
return () -> addNotice(new RunNotice());
}
@Override
@ -1274,7 +1280,7 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
@Override
public void statusChanged(String taskId, TaskStatus status)
{
notices.add(new RunNotice());
addNotice(new RunNotice());
}
}, Execs.directExecutor()
);
@ -3109,6 +3115,7 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
private void addNotice(Notice notice)
{
log.debug("Adding notice [%s] to notices queue", notice.getClass().getName());
notices.add(notice);
}