mirror of
https://github.com/apache/druid.git
synced 2025-02-20 00:47:40 +00:00
use memoized supplier for lazy singleton in SeekableStreamIndexTask.java (#7740)
This commit is contained in:
parent
54b3f363c4
commit
daf20b4b86
@ -24,6 +24,8 @@ import com.google.common.annotations.VisibleForTesting;
|
|||||||
import com.google.common.base.Joiner;
|
import com.google.common.base.Joiner;
|
||||||
import com.google.common.base.Optional;
|
import com.google.common.base.Optional;
|
||||||
import com.google.common.base.Preconditions;
|
import com.google.common.base.Preconditions;
|
||||||
|
import com.google.common.base.Supplier;
|
||||||
|
import com.google.common.base.Suppliers;
|
||||||
import org.apache.druid.data.input.InputRow;
|
import org.apache.druid.data.input.InputRow;
|
||||||
import org.apache.druid.indexer.TaskStatus;
|
import org.apache.druid.indexer.TaskStatus;
|
||||||
import org.apache.druid.indexing.appenderator.ActionBasedSegmentAllocator;
|
import org.apache.druid.indexing.appenderator.ActionBasedSegmentAllocator;
|
||||||
@ -77,8 +79,7 @@ public abstract class SeekableStreamIndexTask<PartitionIdType, SequenceOffsetTyp
|
|||||||
// Lazily initialized, to avoid calling it on the overlord when tasks are instantiated.
|
// Lazily initialized, to avoid calling it on the overlord when tasks are instantiated.
|
||||||
// See https://github.com/apache/incubator-druid/issues/7724 for issues that can cause.
|
// See https://github.com/apache/incubator-druid/issues/7724 for issues that can cause.
|
||||||
// By the way, lazily init is synchronized because the runner may be needed in multiple threads.
|
// By the way, lazily init is synchronized because the runner may be needed in multiple threads.
|
||||||
private final Object runnerInitLock = new Object();
|
private final Supplier<SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOffsetType>> runnerSupplier;
|
||||||
private volatile SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOffsetType> runner;
|
|
||||||
|
|
||||||
public SeekableStreamIndexTask(
|
public SeekableStreamIndexTask(
|
||||||
final String id,
|
final String id,
|
||||||
@ -112,6 +113,7 @@ public abstract class SeekableStreamIndexTask<PartitionIdType, SequenceOffsetTyp
|
|||||||
this.context = context;
|
this.context = context;
|
||||||
this.authorizerMapper = authorizerMapper;
|
this.authorizerMapper = authorizerMapper;
|
||||||
this.rowIngestionMetersFactory = rowIngestionMetersFactory;
|
this.rowIngestionMetersFactory = rowIngestionMetersFactory;
|
||||||
|
this.runnerSupplier = Suppliers.memoize(this::createTaskRunner);
|
||||||
}
|
}
|
||||||
|
|
||||||
private static String makeTaskId(String dataSource, String type)
|
private static String makeTaskId(String dataSource, String type)
|
||||||
@ -288,14 +290,6 @@ public abstract class SeekableStreamIndexTask<PartitionIdType, SequenceOffsetTyp
|
|||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
public SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOffsetType> getRunner()
|
public SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOffsetType> getRunner()
|
||||||
{
|
{
|
||||||
if (runner == null) {
|
return runnerSupplier.get();
|
||||||
synchronized (runnerInitLock) {
|
|
||||||
if (runner == null) {
|
|
||||||
runner = createTaskRunner();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return runner;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user