mirror of https://github.com/apache/druid.git
StreamingTaskRunner: Close the rejection period updater executor service (#17490)
This commit is contained in:
parent
8853c7e5c6
commit
c1d6328249
|
@ -128,6 +128,7 @@ import java.util.concurrent.CopyOnWriteArrayList;
|
||||||
import java.util.concurrent.ExecutionException;
|
import java.util.concurrent.ExecutionException;
|
||||||
import java.util.concurrent.Future;
|
import java.util.concurrent.Future;
|
||||||
import java.util.concurrent.RejectedExecutionException;
|
import java.util.concurrent.RejectedExecutionException;
|
||||||
|
import java.util.concurrent.ScheduledExecutorService;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.TimeoutException;
|
import java.util.concurrent.TimeoutException;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
@ -249,6 +250,7 @@ public abstract class SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
|
||||||
|
|
||||||
private volatile DateTime minMessageTime;
|
private volatile DateTime minMessageTime;
|
||||||
private volatile DateTime maxMessageTime;
|
private volatile DateTime maxMessageTime;
|
||||||
|
private final ScheduledExecutorService rejectionPeriodUpdaterExec;
|
||||||
|
|
||||||
public SeekableStreamIndexTaskRunner(
|
public SeekableStreamIndexTaskRunner(
|
||||||
final SeekableStreamIndexTask<PartitionIdType, SequenceOffsetType, RecordType> task,
|
final SeekableStreamIndexTask<PartitionIdType, SequenceOffsetType, RecordType> task,
|
||||||
|
@ -273,15 +275,15 @@ public abstract class SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
|
||||||
|
|
||||||
minMessageTime = ioConfig.getMinimumMessageTime().or(DateTimes.MIN);
|
minMessageTime = ioConfig.getMinimumMessageTime().or(DateTimes.MIN);
|
||||||
maxMessageTime = ioConfig.getMaximumMessageTime().or(DateTimes.MAX);
|
maxMessageTime = ioConfig.getMaximumMessageTime().or(DateTimes.MAX);
|
||||||
|
rejectionPeriodUpdaterExec = Execs.scheduledSingleThreaded("RejectionPeriodUpdater-Exec--%d");
|
||||||
|
|
||||||
if (ioConfig.getRefreshRejectionPeriodsInMinutes() != null) {
|
if (ioConfig.getRefreshRejectionPeriodsInMinutes() != null) {
|
||||||
Execs.scheduledSingleThreaded("RejectionPeriodUpdater-Exec--%d")
|
rejectionPeriodUpdaterExec
|
||||||
.scheduleWithFixedDelay(
|
.scheduleWithFixedDelay(
|
||||||
this::refreshMinMaxMessageTime,
|
this::refreshMinMaxMessageTime,
|
||||||
ioConfig.getRefreshRejectionPeriodsInMinutes(),
|
ioConfig.getRefreshRejectionPeriodsInMinutes(),
|
||||||
ioConfig.getRefreshRejectionPeriodsInMinutes(),
|
ioConfig.getRefreshRejectionPeriodsInMinutes(),
|
||||||
TimeUnit.MINUTES
|
TimeUnit.MINUTES);
|
||||||
);
|
|
||||||
}
|
}
|
||||||
resetNextCheckpointTime();
|
resetNextCheckpointTime();
|
||||||
}
|
}
|
||||||
|
@ -940,6 +942,7 @@ public abstract class SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
|
||||||
toolbox.getDruidNodeAnnouncer().unannounce(discoveryDruidNode);
|
toolbox.getDruidNodeAnnouncer().unannounce(discoveryDruidNode);
|
||||||
toolbox.getDataSegmentServerAnnouncer().unannounce();
|
toolbox.getDataSegmentServerAnnouncer().unannounce();
|
||||||
}
|
}
|
||||||
|
rejectionPeriodUpdaterExec.shutdown();
|
||||||
}
|
}
|
||||||
catch (Throwable e) {
|
catch (Throwable e) {
|
||||||
if (caughtExceptionOuter != null) {
|
if (caughtExceptionOuter != null) {
|
||||||
|
|
Loading…
Reference in New Issue