mirror of https://github.com/apache/druid.git
Use map.putIfAbsent() or map.computeIfAbsent() as appropriate instead of containsKey() + put() (#7764)
* https://github.com/apache/incubator-druid/issues/7316 Use Map.putIfAbsent() instead of containsKey() + put() * fixing indentation * Using map.computeIfAbsent() instead of map.putIfAbsent() where appropriate * fixing checkstyle * Changing the recommendation text * Reverting auto changes made by IDE * Implementing recommendation: A ConcurrentHashMap on which computeIfAbsent() is called should be assigned into variables of ConcurrentHashMap type, not ConcurrentMap * Removing unused import
This commit is contained in:
parent
05346a9e0c
commit
3bee6adcf7
|
@ -306,6 +306,11 @@
|
||||||
<constraint name="x" maxCount="2147483647" within="" contains="" />
|
<constraint name="x" maxCount="2147483647" within="" contains="" />
|
||||||
<constraint name="ImmutableMap" regexp="Immutable.*" within="" contains="" />
|
<constraint name="ImmutableMap" regexp="Immutable.*" within="" contains="" />
|
||||||
</searchConfiguration>
|
</searchConfiguration>
|
||||||
|
<searchConfiguration name="Use map.putIfAbsent(k,v) or map.computeIfAbsent(k,v) where appropriate instead of containsKey() + put(). If computing v is expensive or has side effects use map.computeIfAbsent() instead" created="1558868694225" text="if (!$m$.containsKey($k$)) { $m$.put($k$, $v$); }" recursive="false" caseInsensitive="true" type="JAVA">
|
||||||
|
<constraint name="m" within="" contains="" />
|
||||||
|
<constraint name="k" within="" contains="" />
|
||||||
|
<constraint name="v" within="" contains="" />
|
||||||
|
</searchConfiguration>
|
||||||
</inspection_tool>
|
</inspection_tool>
|
||||||
<inspection_tool class="SimplifyStreamApiCallChains" enabled="true" level="ERROR" enabled_by_default="true" />
|
<inspection_tool class="SimplifyStreamApiCallChains" enabled="true" level="ERROR" enabled_by_default="true" />
|
||||||
<inspection_tool class="SpellCheckingInspection" enabled="false" level="TYPO" enabled_by_default="false">
|
<inspection_tool class="SpellCheckingInspection" enabled="false" level="TYPO" enabled_by_default="false">
|
||||||
|
@ -400,4 +405,4 @@
|
||||||
<option name="ADD_NONJAVA_TO_ENTRIES" value="true" />
|
<option name="ADD_NONJAVA_TO_ENTRIES" value="true" />
|
||||||
</inspection_tool>
|
</inspection_tool>
|
||||||
</profile>
|
</profile>
|
||||||
</component>
|
</component>
|
||||||
|
|
|
@ -261,9 +261,7 @@ public class S3DataSegmentMoverTest
|
||||||
@Override
|
@Override
|
||||||
public PutObjectResult putObject(String bucketName, String key, File file)
|
public PutObjectResult putObject(String bucketName, String key, File file)
|
||||||
{
|
{
|
||||||
if (!storage.containsKey(bucketName)) {
|
storage.putIfAbsent(bucketName, new HashSet<>());
|
||||||
storage.put(bucketName, new HashSet<>());
|
|
||||||
}
|
|
||||||
storage.get(bucketName).add(key);
|
storage.get(bucketName).add(key);
|
||||||
return new PutObjectResult();
|
return new PutObjectResult();
|
||||||
}
|
}
|
||||||
|
|
|
@ -307,9 +307,7 @@ public class DetermineHashedPartitionsJob implements Jobby
|
||||||
.getSegmentGranularity()
|
.getSegmentGranularity()
|
||||||
.bucket(DateTimes.utc(inputRow.getTimestampFromEpoch()));
|
.bucket(DateTimes.utc(inputRow.getTimestampFromEpoch()));
|
||||||
|
|
||||||
if (!hyperLogLogs.containsKey(interval)) {
|
hyperLogLogs.computeIfAbsent(interval, intv -> HyperLogLogCollector.makeLatestCollector());
|
||||||
hyperLogLogs.put(interval, HyperLogLogCollector.makeLatestCollector());
|
|
||||||
}
|
|
||||||
} else {
|
} else {
|
||||||
final Optional<Interval> maybeInterval = config.getGranularitySpec()
|
final Optional<Interval> maybeInterval = config.getGranularitySpec()
|
||||||
.bucketInterval(DateTimes.utc(inputRow.getTimestampFromEpoch()));
|
.bucketInterval(DateTimes.utc(inputRow.getTimestampFromEpoch()));
|
||||||
|
|
|
@ -50,9 +50,8 @@ public class ActionBasedUsedSegmentChecker implements UsedSegmentChecker
|
||||||
// Group by dataSource
|
// Group by dataSource
|
||||||
final Map<String, Set<SegmentIdWithShardSpec>> identifiersByDataSource = new TreeMap<>();
|
final Map<String, Set<SegmentIdWithShardSpec>> identifiersByDataSource = new TreeMap<>();
|
||||||
for (SegmentIdWithShardSpec identifier : identifiers) {
|
for (SegmentIdWithShardSpec identifier : identifiers) {
|
||||||
if (!identifiersByDataSource.containsKey(identifier.getDataSource())) {
|
identifiersByDataSource.computeIfAbsent(identifier.getDataSource(), k -> new HashSet<>());
|
||||||
identifiersByDataSource.put(identifier.getDataSource(), new HashSet<>());
|
|
||||||
}
|
|
||||||
identifiersByDataSource.get(identifier.getDataSource()).add(identifier);
|
identifiersByDataSource.get(identifier.getDataSource()).add(identifier);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -768,9 +768,7 @@ public class IndexTask extends AbstractTask implements ChatHandler
|
||||||
}
|
}
|
||||||
|
|
||||||
if (determineNumPartitions) {
|
if (determineNumPartitions) {
|
||||||
if (!hllCollectors.containsKey(interval)) {
|
hllCollectors.computeIfAbsent(interval, intv -> Optional.of(HyperLogLogCollector.makeLatestCollector()));
|
||||||
hllCollectors.put(interval, Optional.of(HyperLogLogCollector.makeLatestCollector()));
|
|
||||||
}
|
|
||||||
|
|
||||||
List<Object> groupKey = Rows.toGroupKey(
|
List<Object> groupKey = Rows.toGroupKey(
|
||||||
queryGranularity.bucketStart(inputRow.getTimestamp()).getMillis(),
|
queryGranularity.bucketStart(inputRow.getTimestamp()).getMillis(),
|
||||||
|
@ -781,9 +779,7 @@ public class IndexTask extends AbstractTask implements ChatHandler
|
||||||
} else {
|
} else {
|
||||||
// we don't need to determine partitions but we still need to determine intervals, so add an Optional.absent()
|
// we don't need to determine partitions but we still need to determine intervals, so add an Optional.absent()
|
||||||
// for the interval and don't instantiate a HLL collector
|
// for the interval and don't instantiate a HLL collector
|
||||||
if (!hllCollectors.containsKey(interval)) {
|
hllCollectors.putIfAbsent(interval, Optional.absent());
|
||||||
hllCollectors.put(interval, Optional.absent());
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
determinePartitionsMeters.incrementProcessed();
|
determinePartitionsMeters.incrementProcessed();
|
||||||
}
|
}
|
||||||
|
|
|
@ -204,87 +204,87 @@ public class IngestSegmentFirehoseFactory implements FiniteFirehoseFactory<Input
|
||||||
segmentIds
|
segmentIds
|
||||||
);
|
);
|
||||||
|
|
||||||
try {
|
final List<TimelineObjectHolder<String, DataSegment>> timeLineSegments = getTimeline();
|
||||||
final List<TimelineObjectHolder<String, DataSegment>> timeLineSegments = getTimeline();
|
|
||||||
|
|
||||||
// Download all segments locally.
|
// Download all segments locally.
|
||||||
// Note: this requires enough local storage space to fit all of the segments, even though
|
// Note: this requires enough local storage space to fit all of the segments, even though
|
||||||
// IngestSegmentFirehose iterates over the segments in series. We may want to change this
|
// IngestSegmentFirehose iterates over the segments in series. We may want to change this
|
||||||
// to download files lazily, perhaps sharing code with PrefetchableTextFilesFirehoseFactory.
|
// to download files lazily, perhaps sharing code with PrefetchableTextFilesFirehoseFactory.
|
||||||
final SegmentLoader segmentLoader = segmentLoaderFactory.manufacturate(temporaryDirectory);
|
final SegmentLoader segmentLoader = segmentLoaderFactory.manufacturate(temporaryDirectory);
|
||||||
Map<DataSegment, File> segmentFileMap = Maps.newLinkedHashMap();
|
Map<DataSegment, File> segmentFileMap = Maps.newLinkedHashMap();
|
||||||
for (TimelineObjectHolder<String, DataSegment> holder : timeLineSegments) {
|
for (TimelineObjectHolder<String, DataSegment> holder : timeLineSegments) {
|
||||||
for (PartitionChunk<DataSegment> chunk : holder.getObject()) {
|
for (PartitionChunk<DataSegment> chunk : holder.getObject()) {
|
||||||
final DataSegment segment = chunk.getObject();
|
final DataSegment segment = chunk.getObject();
|
||||||
if (!segmentFileMap.containsKey(segment)) {
|
|
||||||
segmentFileMap.put(segment, segmentLoader.getSegmentFiles(segment));
|
segmentFileMap.computeIfAbsent(segment, k -> {
|
||||||
|
try {
|
||||||
|
return segmentLoader.getSegmentFiles(segment);
|
||||||
}
|
}
|
||||||
}
|
catch (SegmentLoadingException e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
final List<String> dims;
|
final List<String> dims;
|
||||||
if (dimensions != null) {
|
if (dimensions != null) {
|
||||||
dims = dimensions;
|
dims = dimensions;
|
||||||
} else if (inputRowParser.getParseSpec().getDimensionsSpec().hasCustomDimensions()) {
|
} else if (inputRowParser.getParseSpec().getDimensionsSpec().hasCustomDimensions()) {
|
||||||
dims = inputRowParser.getParseSpec().getDimensionsSpec().getDimensionNames();
|
dims = inputRowParser.getParseSpec().getDimensionsSpec().getDimensionNames();
|
||||||
} else {
|
} else {
|
||||||
dims = getUniqueDimensions(
|
dims = getUniqueDimensions(
|
||||||
timeLineSegments,
|
timeLineSegments,
|
||||||
inputRowParser.getParseSpec().getDimensionsSpec().getDimensionExclusions()
|
inputRowParser.getParseSpec().getDimensionsSpec().getDimensionExclusions()
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
final List<String> metricsList = metrics == null ? getUniqueMetrics(timeLineSegments) : metrics;
|
final List<String> metricsList = metrics == null ? getUniqueMetrics(timeLineSegments) : metrics;
|
||||||
|
|
||||||
final List<WindowedStorageAdapter> adapters = Lists.newArrayList(
|
final List<WindowedStorageAdapter> adapters = Lists.newArrayList(
|
||||||
Iterables.concat(
|
Iterables.concat(
|
||||||
Iterables.transform(
|
Iterables.transform(
|
||||||
timeLineSegments,
|
timeLineSegments,
|
||||||
new Function<TimelineObjectHolder<String, DataSegment>, Iterable<WindowedStorageAdapter>>()
|
new Function<TimelineObjectHolder<String, DataSegment>, Iterable<WindowedStorageAdapter>>() {
|
||||||
{
|
@Override
|
||||||
|
public Iterable<WindowedStorageAdapter> apply(final TimelineObjectHolder<String, DataSegment> holder)
|
||||||
|
{
|
||||||
|
return
|
||||||
|
Iterables.transform(
|
||||||
|
holder.getObject(),
|
||||||
|
new Function<PartitionChunk<DataSegment>, WindowedStorageAdapter>() {
|
||||||
@Override
|
@Override
|
||||||
public Iterable<WindowedStorageAdapter> apply(final TimelineObjectHolder<String, DataSegment> holder)
|
public WindowedStorageAdapter apply(final PartitionChunk<DataSegment> input)
|
||||||
{
|
{
|
||||||
return
|
final DataSegment segment = input.getObject();
|
||||||
Iterables.transform(
|
try {
|
||||||
holder.getObject(),
|
return new WindowedStorageAdapter(
|
||||||
new Function<PartitionChunk<DataSegment>, WindowedStorageAdapter>()
|
new QueryableIndexStorageAdapter(
|
||||||
{
|
indexIO.loadIndex(
|
||||||
@Override
|
Preconditions.checkNotNull(
|
||||||
public WindowedStorageAdapter apply(final PartitionChunk<DataSegment> input)
|
segmentFileMap.get(segment),
|
||||||
{
|
"File for segment %s", segment.getId()
|
||||||
final DataSegment segment = input.getObject();
|
)
|
||||||
try {
|
)
|
||||||
return new WindowedStorageAdapter(
|
),
|
||||||
new QueryableIndexStorageAdapter(
|
holder.getInterval()
|
||||||
indexIO.loadIndex(
|
);
|
||||||
Preconditions.checkNotNull(
|
}
|
||||||
segmentFileMap.get(segment),
|
catch (IOException e) {
|
||||||
"File for segment %s", segment.getId()
|
throw new RuntimeException(e);
|
||||||
)
|
}
|
||||||
)
|
|
||||||
),
|
|
||||||
holder.getInterval()
|
|
||||||
);
|
|
||||||
}
|
|
||||||
catch (IOException e) {
|
|
||||||
throw new RuntimeException(e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
)
|
);
|
||||||
)
|
}
|
||||||
);
|
}
|
||||||
|
)
|
||||||
|
)
|
||||||
|
);
|
||||||
|
|
||||||
final TransformSpec transformSpec = TransformSpec.fromInputRowParser(inputRowParser);
|
final TransformSpec transformSpec = TransformSpec.fromInputRowParser(inputRowParser);
|
||||||
return new IngestSegmentFirehose(adapters, transformSpec, dims, metricsList, dimFilter);
|
return new IngestSegmentFirehose(adapters, transformSpec, dims, metricsList, dimFilter);
|
||||||
}
|
|
||||||
catch (SegmentLoadingException e) {
|
|
||||||
throw new RuntimeException(e);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private long jitter(long input)
|
private long jitter(long input)
|
||||||
|
@ -508,13 +508,14 @@ public class IngestSegmentFirehoseFactory implements FiniteFirehoseFactory<Input
|
||||||
// segments to olders.
|
// segments to olders.
|
||||||
|
|
||||||
// timelineSegments are sorted in order of interval
|
// timelineSegments are sorted in order of interval
|
||||||
int index = 0;
|
int[] index = {0};
|
||||||
for (TimelineObjectHolder<String, DataSegment> timelineHolder : Lists.reverse(timelineSegments)) {
|
for (TimelineObjectHolder<String, DataSegment> timelineHolder : Lists.reverse(timelineSegments)) {
|
||||||
for (PartitionChunk<DataSegment> chunk : timelineHolder.getObject()) {
|
for (PartitionChunk<DataSegment> chunk : timelineHolder.getObject()) {
|
||||||
for (String metric : chunk.getObject().getMetrics()) {
|
for (String metric : chunk.getObject().getMetrics()) {
|
||||||
if (!uniqueMetrics.containsKey(metric)) {
|
uniqueMetrics.computeIfAbsent(metric, k -> {
|
||||||
uniqueMetrics.put(metric, index++);
|
return index[0]++;
|
||||||
}
|
}
|
||||||
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -84,7 +84,6 @@ import java.util.Set;
|
||||||
import java.util.UUID;
|
import java.util.UUID;
|
||||||
import java.util.concurrent.Callable;
|
import java.util.concurrent.Callable;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
import java.util.concurrent.ConcurrentMap;
|
|
||||||
import java.util.concurrent.CopyOnWriteArrayList;
|
import java.util.concurrent.CopyOnWriteArrayList;
|
||||||
import java.util.concurrent.Executor;
|
import java.util.concurrent.Executor;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
@ -108,7 +107,7 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogStreamer
|
||||||
private final CopyOnWriteArrayList<Pair<TaskRunnerListener, Executor>> listeners = new CopyOnWriteArrayList<>();
|
private final CopyOnWriteArrayList<Pair<TaskRunnerListener, Executor>> listeners = new CopyOnWriteArrayList<>();
|
||||||
|
|
||||||
/** Writes must be synchronized. This is only a ConcurrentMap so "informational" reads can occur without waiting. */
|
/** Writes must be synchronized. This is only a ConcurrentMap so "informational" reads can occur without waiting. */
|
||||||
private final ConcurrentMap<String, ForkingTaskRunnerWorkItem> tasks = new ConcurrentHashMap<>();
|
private final ConcurrentHashMap<String, ForkingTaskRunnerWorkItem> tasks = new ConcurrentHashMap<>();
|
||||||
|
|
||||||
private volatile boolean stopping = false;
|
private volatile boolean stopping = false;
|
||||||
|
|
||||||
|
@ -214,309 +213,306 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogStreamer
|
||||||
public ListenableFuture<TaskStatus> run(final Task task)
|
public ListenableFuture<TaskStatus> run(final Task task)
|
||||||
{
|
{
|
||||||
synchronized (tasks) {
|
synchronized (tasks) {
|
||||||
if (!tasks.containsKey(task.getId())) {
|
tasks.computeIfAbsent(
|
||||||
tasks.put(
|
task.getId(), k ->
|
||||||
task.getId(),
|
new ForkingTaskRunnerWorkItem(
|
||||||
new ForkingTaskRunnerWorkItem(
|
task,
|
||||||
task,
|
exec.submit(
|
||||||
exec.submit(
|
new Callable<TaskStatus>() {
|
||||||
new Callable<TaskStatus>()
|
@Override
|
||||||
{
|
public TaskStatus call()
|
||||||
@Override
|
{
|
||||||
public TaskStatus call()
|
final String attemptUUID = UUID.randomUUID().toString();
|
||||||
{
|
final File taskDir = taskConfig.getTaskDir(task.getId());
|
||||||
final String attemptUUID = UUID.randomUUID().toString();
|
final File attemptDir = new File(taskDir, attemptUUID);
|
||||||
final File taskDir = taskConfig.getTaskDir(task.getId());
|
|
||||||
final File attemptDir = new File(taskDir, attemptUUID);
|
|
||||||
|
|
||||||
final ProcessHolder processHolder;
|
final ProcessHolder processHolder;
|
||||||
final String childHost = node.getHost();
|
final String childHost = node.getHost();
|
||||||
int childPort = -1;
|
int childPort = -1;
|
||||||
int tlsChildPort = -1;
|
int tlsChildPort = -1;
|
||||||
|
|
||||||
if (node.isEnablePlaintextPort()) {
|
if (node.isEnablePlaintextPort()) {
|
||||||
childPort = portFinder.findUnusedPort();
|
childPort = portFinder.findUnusedPort();
|
||||||
|
}
|
||||||
|
|
||||||
|
if (node.isEnableTlsPort()) {
|
||||||
|
tlsChildPort = portFinder.findUnusedPort();
|
||||||
|
}
|
||||||
|
|
||||||
|
final TaskLocation taskLocation = TaskLocation.create(childHost, childPort, tlsChildPort);
|
||||||
|
|
||||||
|
try {
|
||||||
|
final Closer closer = Closer.create();
|
||||||
|
try {
|
||||||
|
if (!attemptDir.mkdirs()) {
|
||||||
|
throw new IOE("Could not create directories: %s", attemptDir);
|
||||||
|
}
|
||||||
|
|
||||||
|
final File taskFile = new File(taskDir, "task.json");
|
||||||
|
final File statusFile = new File(attemptDir, "status.json");
|
||||||
|
final File logFile = new File(taskDir, "log");
|
||||||
|
final File reportsFile = new File(attemptDir, "report.json");
|
||||||
|
|
||||||
|
// time to adjust process holders
|
||||||
|
synchronized (tasks) {
|
||||||
|
final ForkingTaskRunnerWorkItem taskWorkItem = tasks.get(task.getId());
|
||||||
|
|
||||||
|
if (taskWorkItem.shutdown) {
|
||||||
|
throw new IllegalStateException("Task has been shut down!");
|
||||||
}
|
}
|
||||||
|
|
||||||
if (node.isEnableTlsPort()) {
|
if (taskWorkItem == null) {
|
||||||
tlsChildPort = portFinder.findUnusedPort();
|
log.makeAlert("WTF?! TaskInfo disappeared!").addData("task", task.getId()).emit();
|
||||||
|
throw new ISE("TaskInfo disappeared for task[%s]!", task.getId());
|
||||||
}
|
}
|
||||||
|
|
||||||
final TaskLocation taskLocation = TaskLocation.create(childHost, childPort, tlsChildPort);
|
if (taskWorkItem.processHolder != null) {
|
||||||
|
log.makeAlert("WTF?! TaskInfo already has a processHolder")
|
||||||
|
.addData("task", task.getId())
|
||||||
|
.emit();
|
||||||
|
throw new ISE("TaskInfo already has processHolder for task[%s]!", task.getId());
|
||||||
|
}
|
||||||
|
|
||||||
try {
|
final List<String> command = new ArrayList<>();
|
||||||
final Closer closer = Closer.create();
|
final String taskClasspath;
|
||||||
try {
|
if (task.getClasspathPrefix() != null && !task.getClasspathPrefix().isEmpty()) {
|
||||||
if (!attemptDir.mkdirs()) {
|
taskClasspath = Joiner.on(File.pathSeparator).join(
|
||||||
throw new IOE("Could not create directories: %s", attemptDir);
|
task.getClasspathPrefix(),
|
||||||
|
config.getClasspath()
|
||||||
|
);
|
||||||
|
} else {
|
||||||
|
taskClasspath = config.getClasspath();
|
||||||
|
}
|
||||||
|
|
||||||
|
command.add(config.getJavaCommand());
|
||||||
|
command.add("-cp");
|
||||||
|
command.add(taskClasspath);
|
||||||
|
|
||||||
|
Iterables.addAll(command, new QuotableWhiteSpaceSplitter(config.getJavaOpts()));
|
||||||
|
Iterables.addAll(command, config.getJavaOptsArray());
|
||||||
|
|
||||||
|
// Override task specific javaOpts
|
||||||
|
Object taskJavaOpts = task.getContextValue(
|
||||||
|
ForkingTaskRunnerConfig.JAVA_OPTS_PROPERTY
|
||||||
|
);
|
||||||
|
if (taskJavaOpts != null) {
|
||||||
|
Iterables.addAll(
|
||||||
|
command,
|
||||||
|
new QuotableWhiteSpaceSplitter((String) taskJavaOpts)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
for (String propName : props.stringPropertyNames()) {
|
||||||
|
for (String allowedPrefix : config.getAllowedPrefixes()) {
|
||||||
|
// See https://github.com/apache/incubator-druid/issues/1841
|
||||||
|
if (propName.startsWith(allowedPrefix)
|
||||||
|
&& !ForkingTaskRunnerConfig.JAVA_OPTS_PROPERTY.equals(propName)
|
||||||
|
&& !ForkingTaskRunnerConfig.JAVA_OPTS_ARRAY_PROPERTY.equals(propName)
|
||||||
|
) {
|
||||||
|
command.add(
|
||||||
|
StringUtils.format(
|
||||||
|
"-D%s=%s",
|
||||||
|
propName,
|
||||||
|
props.getProperty(propName)
|
||||||
|
)
|
||||||
|
);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
final File taskFile = new File(taskDir, "task.json");
|
// Override child JVM specific properties
|
||||||
final File statusFile = new File(attemptDir, "status.json");
|
for (String propName : props.stringPropertyNames()) {
|
||||||
final File logFile = new File(taskDir, "log");
|
if (propName.startsWith(CHILD_PROPERTY_PREFIX)) {
|
||||||
final File reportsFile = new File(attemptDir, "report.json");
|
command.add(
|
||||||
|
StringUtils.format(
|
||||||
// time to adjust process holders
|
"-D%s=%s",
|
||||||
synchronized (tasks) {
|
propName.substring(CHILD_PROPERTY_PREFIX.length()),
|
||||||
final ForkingTaskRunnerWorkItem taskWorkItem = tasks.get(task.getId());
|
props.getProperty(propName)
|
||||||
|
)
|
||||||
if (taskWorkItem.shutdown) {
|
|
||||||
throw new IllegalStateException("Task has been shut down!");
|
|
||||||
}
|
|
||||||
|
|
||||||
if (taskWorkItem == null) {
|
|
||||||
log.makeAlert("WTF?! TaskInfo disappeared!").addData("task", task.getId()).emit();
|
|
||||||
throw new ISE("TaskInfo disappeared for task[%s]!", task.getId());
|
|
||||||
}
|
|
||||||
|
|
||||||
if (taskWorkItem.processHolder != null) {
|
|
||||||
log.makeAlert("WTF?! TaskInfo already has a processHolder")
|
|
||||||
.addData("task", task.getId())
|
|
||||||
.emit();
|
|
||||||
throw new ISE("TaskInfo already has processHolder for task[%s]!", task.getId());
|
|
||||||
}
|
|
||||||
|
|
||||||
final List<String> command = new ArrayList<>();
|
|
||||||
final String taskClasspath;
|
|
||||||
if (task.getClasspathPrefix() != null && !task.getClasspathPrefix().isEmpty()) {
|
|
||||||
taskClasspath = Joiner.on(File.pathSeparator).join(
|
|
||||||
task.getClasspathPrefix(),
|
|
||||||
config.getClasspath()
|
|
||||||
);
|
|
||||||
} else {
|
|
||||||
taskClasspath = config.getClasspath();
|
|
||||||
}
|
|
||||||
|
|
||||||
command.add(config.getJavaCommand());
|
|
||||||
command.add("-cp");
|
|
||||||
command.add(taskClasspath);
|
|
||||||
|
|
||||||
Iterables.addAll(command, new QuotableWhiteSpaceSplitter(config.getJavaOpts()));
|
|
||||||
Iterables.addAll(command, config.getJavaOptsArray());
|
|
||||||
|
|
||||||
// Override task specific javaOpts
|
|
||||||
Object taskJavaOpts = task.getContextValue(
|
|
||||||
ForkingTaskRunnerConfig.JAVA_OPTS_PROPERTY
|
|
||||||
);
|
|
||||||
if (taskJavaOpts != null) {
|
|
||||||
Iterables.addAll(
|
|
||||||
command,
|
|
||||||
new QuotableWhiteSpaceSplitter((String) taskJavaOpts)
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
for (String propName : props.stringPropertyNames()) {
|
|
||||||
for (String allowedPrefix : config.getAllowedPrefixes()) {
|
|
||||||
// See https://github.com/apache/incubator-druid/issues/1841
|
|
||||||
if (propName.startsWith(allowedPrefix)
|
|
||||||
&& !ForkingTaskRunnerConfig.JAVA_OPTS_PROPERTY.equals(propName)
|
|
||||||
&& !ForkingTaskRunnerConfig.JAVA_OPTS_ARRAY_PROPERTY.equals(propName)
|
|
||||||
) {
|
|
||||||
command.add(
|
|
||||||
StringUtils.format(
|
|
||||||
"-D%s=%s",
|
|
||||||
propName,
|
|
||||||
props.getProperty(propName)
|
|
||||||
)
|
|
||||||
);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Override child JVM specific properties
|
|
||||||
for (String propName : props.stringPropertyNames()) {
|
|
||||||
if (propName.startsWith(CHILD_PROPERTY_PREFIX)) {
|
|
||||||
command.add(
|
|
||||||
StringUtils.format(
|
|
||||||
"-D%s=%s",
|
|
||||||
propName.substring(CHILD_PROPERTY_PREFIX.length()),
|
|
||||||
props.getProperty(propName)
|
|
||||||
)
|
|
||||||
);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Override task specific properties
|
|
||||||
final Map<String, Object> context = task.getContext();
|
|
||||||
if (context != null) {
|
|
||||||
for (String propName : context.keySet()) {
|
|
||||||
if (propName.startsWith(CHILD_PROPERTY_PREFIX)) {
|
|
||||||
command.add(
|
|
||||||
StringUtils.format(
|
|
||||||
"-D%s=%s",
|
|
||||||
propName.substring(CHILD_PROPERTY_PREFIX.length()),
|
|
||||||
task.getContextValue(propName)
|
|
||||||
)
|
|
||||||
);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Add dataSource, taskId and taskType for metrics or logging
|
|
||||||
command.add(
|
|
||||||
StringUtils.format(
|
|
||||||
"-D%s%s=%s",
|
|
||||||
MonitorsConfig.METRIC_DIMENSION_PREFIX,
|
|
||||||
DruidMetrics.DATASOURCE,
|
|
||||||
task.getDataSource()
|
|
||||||
)
|
|
||||||
);
|
|
||||||
command.add(
|
|
||||||
StringUtils.format(
|
|
||||||
"-D%s%s=%s",
|
|
||||||
MonitorsConfig.METRIC_DIMENSION_PREFIX,
|
|
||||||
DruidMetrics.TASK_ID,
|
|
||||||
task.getId()
|
|
||||||
)
|
|
||||||
);
|
|
||||||
command.add(
|
|
||||||
StringUtils.format(
|
|
||||||
"-D%s%s=%s",
|
|
||||||
MonitorsConfig.METRIC_DIMENSION_PREFIX,
|
|
||||||
DruidMetrics.TASK_TYPE,
|
|
||||||
task.getType()
|
|
||||||
)
|
|
||||||
);
|
|
||||||
|
|
||||||
command.add(StringUtils.format("-Ddruid.host=%s", childHost));
|
|
||||||
command.add(StringUtils.format("-Ddruid.plaintextPort=%d", childPort));
|
|
||||||
command.add(StringUtils.format("-Ddruid.tlsPort=%d", tlsChildPort));
|
|
||||||
/**
|
|
||||||
* These are not enabled per default to allow the user to either set or not set them
|
|
||||||
* Users are highly suggested to be set in druid.indexer.runner.javaOpts
|
|
||||||
* See org.apache.druid.concurrent.TaskThreadPriority#getThreadPriorityFromTaskPriority(int)
|
|
||||||
* for more information
|
|
||||||
command.add("-XX:+UseThreadPriorities");
|
|
||||||
command.add("-XX:ThreadPriorityPolicy=42");
|
|
||||||
*/
|
|
||||||
|
|
||||||
command.add("org.apache.druid.cli.Main");
|
|
||||||
command.add("internal");
|
|
||||||
command.add("peon");
|
|
||||||
command.add(taskFile.toString());
|
|
||||||
command.add(statusFile.toString());
|
|
||||||
command.add(reportsFile.toString());
|
|
||||||
String nodeType = task.getNodeType();
|
|
||||||
if (nodeType != null) {
|
|
||||||
command.add("--nodeType");
|
|
||||||
command.add(nodeType);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!taskFile.exists()) {
|
|
||||||
jsonMapper.writeValue(taskFile, task);
|
|
||||||
}
|
|
||||||
|
|
||||||
log.info("Running command: %s", Joiner.on(" ").join(command));
|
|
||||||
taskWorkItem.processHolder = new ProcessHolder(
|
|
||||||
new ProcessBuilder(ImmutableList.copyOf(command)).redirectErrorStream(true).start(),
|
|
||||||
logFile,
|
|
||||||
taskLocation.getHost(),
|
|
||||||
taskLocation.getPort(),
|
|
||||||
taskLocation.getTlsPort()
|
|
||||||
);
|
|
||||||
|
|
||||||
processHolder = taskWorkItem.processHolder;
|
|
||||||
processHolder.registerWithCloser(closer);
|
|
||||||
}
|
|
||||||
|
|
||||||
TaskRunnerUtils.notifyLocationChanged(listeners, task.getId(), taskLocation);
|
|
||||||
TaskRunnerUtils.notifyStatusChanged(
|
|
||||||
listeners,
|
|
||||||
task.getId(),
|
|
||||||
TaskStatus.running(task.getId())
|
|
||||||
);
|
);
|
||||||
|
|
||||||
log.info("Logging task %s output to: %s", task.getId(), logFile);
|
|
||||||
boolean runFailed = true;
|
|
||||||
|
|
||||||
final ByteSink logSink = Files.asByteSink(logFile, FileWriteMode.APPEND);
|
|
||||||
|
|
||||||
// This will block for a while. So we append the thread information with more details
|
|
||||||
final String priorThreadName = Thread.currentThread().getName();
|
|
||||||
Thread.currentThread().setName(StringUtils.format("%s-[%s]", priorThreadName, task.getId()));
|
|
||||||
|
|
||||||
try (final OutputStream toLogfile = logSink.openStream()) {
|
|
||||||
ByteStreams.copy(processHolder.process.getInputStream(), toLogfile);
|
|
||||||
final int statusCode = processHolder.process.waitFor();
|
|
||||||
log.info("Process exited with status[%d] for task: %s", statusCode, task.getId());
|
|
||||||
if (statusCode == 0) {
|
|
||||||
runFailed = false;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
finally {
|
|
||||||
Thread.currentThread().setName(priorThreadName);
|
|
||||||
// Upload task logs
|
|
||||||
taskLogPusher.pushTaskLog(task.getId(), logFile);
|
|
||||||
if (reportsFile.exists()) {
|
|
||||||
taskLogPusher.pushTaskReports(task.getId(), reportsFile);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
TaskStatus status;
|
|
||||||
if (!runFailed) {
|
|
||||||
// Process exited successfully
|
|
||||||
status = jsonMapper.readValue(statusFile, TaskStatus.class);
|
|
||||||
} else {
|
|
||||||
// Process exited unsuccessfully
|
|
||||||
status = TaskStatus.failure(task.getId());
|
|
||||||
}
|
|
||||||
|
|
||||||
TaskRunnerUtils.notifyStatusChanged(listeners, task.getId(), status);
|
|
||||||
return status;
|
|
||||||
}
|
|
||||||
catch (Throwable t) {
|
|
||||||
throw closer.rethrow(t);
|
|
||||||
}
|
|
||||||
finally {
|
|
||||||
closer.close();
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
catch (Throwable t) {
|
|
||||||
log.info(t, "Exception caught during execution");
|
// Override task specific properties
|
||||||
throw new RuntimeException(t);
|
final Map<String, Object> context = task.getContext();
|
||||||
|
if (context != null) {
|
||||||
|
for (String propName : context.keySet()) {
|
||||||
|
if (propName.startsWith(CHILD_PROPERTY_PREFIX)) {
|
||||||
|
command.add(
|
||||||
|
StringUtils.format(
|
||||||
|
"-D%s=%s",
|
||||||
|
propName.substring(CHILD_PROPERTY_PREFIX.length()),
|
||||||
|
task.getContextValue(propName)
|
||||||
|
)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
finally {
|
|
||||||
try {
|
|
||||||
synchronized (tasks) {
|
|
||||||
final ForkingTaskRunnerWorkItem taskWorkItem = tasks.remove(task.getId());
|
|
||||||
if (taskWorkItem != null && taskWorkItem.processHolder != null) {
|
|
||||||
taskWorkItem.processHolder.process.destroy();
|
|
||||||
}
|
|
||||||
if (!stopping) {
|
|
||||||
saveRunningTasks();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (node.isEnablePlaintextPort()) {
|
// Add dataSource, taskId and taskType for metrics or logging
|
||||||
portFinder.markPortUnused(childPort);
|
command.add(
|
||||||
}
|
StringUtils.format(
|
||||||
if (node.isEnableTlsPort()) {
|
"-D%s%s=%s",
|
||||||
portFinder.markPortUnused(tlsChildPort);
|
MonitorsConfig.METRIC_DIMENSION_PREFIX,
|
||||||
}
|
DruidMetrics.DATASOURCE,
|
||||||
|
task.getDataSource()
|
||||||
|
)
|
||||||
|
);
|
||||||
|
command.add(
|
||||||
|
StringUtils.format(
|
||||||
|
"-D%s%s=%s",
|
||||||
|
MonitorsConfig.METRIC_DIMENSION_PREFIX,
|
||||||
|
DruidMetrics.TASK_ID,
|
||||||
|
task.getId()
|
||||||
|
)
|
||||||
|
);
|
||||||
|
command.add(
|
||||||
|
StringUtils.format(
|
||||||
|
"-D%s%s=%s",
|
||||||
|
MonitorsConfig.METRIC_DIMENSION_PREFIX,
|
||||||
|
DruidMetrics.TASK_TYPE,
|
||||||
|
task.getType()
|
||||||
|
)
|
||||||
|
);
|
||||||
|
|
||||||
try {
|
command.add(StringUtils.format("-Ddruid.host=%s", childHost));
|
||||||
if (!stopping && taskDir.exists()) {
|
command.add(StringUtils.format("-Ddruid.plaintextPort=%d", childPort));
|
||||||
log.info("Removing task directory: %s", taskDir);
|
command.add(StringUtils.format("-Ddruid.tlsPort=%d", tlsChildPort));
|
||||||
FileUtils.deleteDirectory(taskDir);
|
/**
|
||||||
}
|
* These are not enabled per default to allow the user to either set or not set them
|
||||||
}
|
* Users are highly suggested to be set in druid.indexer.runner.javaOpts
|
||||||
catch (Exception e) {
|
* See org.apache.druid.concurrent.TaskThreadPriority#getThreadPriorityFromTaskPriority(int)
|
||||||
log.makeAlert(e, "Failed to delete task directory")
|
* for more information
|
||||||
.addData("taskDir", taskDir.toString())
|
command.add("-XX:+UseThreadPriorities");
|
||||||
.addData("task", task.getId())
|
command.add("-XX:ThreadPriorityPolicy=42");
|
||||||
.emit();
|
*/
|
||||||
}
|
|
||||||
}
|
command.add("org.apache.druid.cli.Main");
|
||||||
catch (Exception e) {
|
command.add("internal");
|
||||||
log.error(e, "Suppressing exception caught while cleaning up task");
|
command.add("peon");
|
||||||
}
|
command.add(taskFile.toString());
|
||||||
|
command.add(statusFile.toString());
|
||||||
|
command.add(reportsFile.toString());
|
||||||
|
String nodeType = task.getNodeType();
|
||||||
|
if (nodeType != null) {
|
||||||
|
command.add("--nodeType");
|
||||||
|
command.add(nodeType);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!taskFile.exists()) {
|
||||||
|
jsonMapper.writeValue(taskFile, task);
|
||||||
|
}
|
||||||
|
|
||||||
|
log.info("Running command: %s", Joiner.on(" ").join(command));
|
||||||
|
taskWorkItem.processHolder = new ProcessHolder(
|
||||||
|
new ProcessBuilder(ImmutableList.copyOf(command)).redirectErrorStream(true).start(),
|
||||||
|
logFile,
|
||||||
|
taskLocation.getHost(),
|
||||||
|
taskLocation.getPort(),
|
||||||
|
taskLocation.getTlsPort()
|
||||||
|
);
|
||||||
|
|
||||||
|
processHolder = taskWorkItem.processHolder;
|
||||||
|
processHolder.registerWithCloser(closer);
|
||||||
|
}
|
||||||
|
|
||||||
|
TaskRunnerUtils.notifyLocationChanged(listeners, task.getId(), taskLocation);
|
||||||
|
TaskRunnerUtils.notifyStatusChanged(
|
||||||
|
listeners,
|
||||||
|
task.getId(),
|
||||||
|
TaskStatus.running(task.getId())
|
||||||
|
);
|
||||||
|
|
||||||
|
log.info("Logging task %s output to: %s", task.getId(), logFile);
|
||||||
|
boolean runFailed = true;
|
||||||
|
|
||||||
|
final ByteSink logSink = Files.asByteSink(logFile, FileWriteMode.APPEND);
|
||||||
|
|
||||||
|
// This will block for a while. So we append the thread information with more details
|
||||||
|
final String priorThreadName = Thread.currentThread().getName();
|
||||||
|
Thread.currentThread().setName(StringUtils.format("%s-[%s]", priorThreadName, task.getId()));
|
||||||
|
|
||||||
|
try (final OutputStream toLogfile = logSink.openStream()) {
|
||||||
|
ByteStreams.copy(processHolder.process.getInputStream(), toLogfile);
|
||||||
|
final int statusCode = processHolder.process.waitFor();
|
||||||
|
log.info("Process exited with status[%d] for task: %s", statusCode, task.getId());
|
||||||
|
if (statusCode == 0) {
|
||||||
|
runFailed = false;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
finally {
|
||||||
|
Thread.currentThread().setName(priorThreadName);
|
||||||
|
// Upload task logs
|
||||||
|
taskLogPusher.pushTaskLog(task.getId(), logFile);
|
||||||
|
if (reportsFile.exists()) {
|
||||||
|
taskLogPusher.pushTaskReports(task.getId(), reportsFile);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
TaskStatus status;
|
||||||
|
if (!runFailed) {
|
||||||
|
// Process exited successfully
|
||||||
|
status = jsonMapper.readValue(statusFile, TaskStatus.class);
|
||||||
|
} else {
|
||||||
|
// Process exited unsuccessfully
|
||||||
|
status = TaskStatus.failure(task.getId());
|
||||||
|
}
|
||||||
|
|
||||||
|
TaskRunnerUtils.notifyStatusChanged(listeners, task.getId(), status);
|
||||||
|
return status;
|
||||||
}
|
}
|
||||||
)
|
catch (Throwable t) {
|
||||||
|
throw closer.rethrow(t);
|
||||||
|
}
|
||||||
|
finally {
|
||||||
|
closer.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
catch (Throwable t) {
|
||||||
|
log.info(t, "Exception caught during execution");
|
||||||
|
throw new RuntimeException(t);
|
||||||
|
}
|
||||||
|
finally {
|
||||||
|
try {
|
||||||
|
synchronized (tasks) {
|
||||||
|
final ForkingTaskRunnerWorkItem taskWorkItem = tasks.remove(task.getId());
|
||||||
|
if (taskWorkItem != null && taskWorkItem.processHolder != null) {
|
||||||
|
taskWorkItem.processHolder.process.destroy();
|
||||||
|
}
|
||||||
|
if (!stopping) {
|
||||||
|
saveRunningTasks();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (node.isEnablePlaintextPort()) {
|
||||||
|
portFinder.markPortUnused(childPort);
|
||||||
|
}
|
||||||
|
if (node.isEnableTlsPort()) {
|
||||||
|
portFinder.markPortUnused(tlsChildPort);
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
if (!stopping && taskDir.exists()) {
|
||||||
|
log.info("Removing task directory: %s", taskDir);
|
||||||
|
FileUtils.deleteDirectory(taskDir);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
catch (Exception e) {
|
||||||
|
log.makeAlert(e, "Failed to delete task directory")
|
||||||
|
.addData("taskDir", taskDir.toString())
|
||||||
|
.addData("task", task.getId())
|
||||||
|
.emit();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
catch (Exception e) {
|
||||||
|
log.error(e, "Suppressing exception caught while cleaning up task");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
)
|
)
|
||||||
);
|
)
|
||||||
}
|
);
|
||||||
saveRunningTasks();
|
saveRunningTasks();
|
||||||
return tasks.get(task.getId()).getResult();
|
return tasks.get(task.getId()).getResult();
|
||||||
}
|
}
|
||||||
|
|
|
@ -166,9 +166,7 @@ public class SQLMetadataSupervisorManager implements MetadataSupervisorManager
|
||||||
{
|
{
|
||||||
try {
|
try {
|
||||||
String specId = pair.lhs;
|
String specId = pair.lhs;
|
||||||
if (!retVal.containsKey(specId)) {
|
retVal.putIfAbsent(specId, new ArrayList<>());
|
||||||
retVal.put(specId, new ArrayList<>());
|
|
||||||
}
|
|
||||||
|
|
||||||
retVal.get(specId).add(pair.rhs);
|
retVal.get(specId).add(pair.rhs);
|
||||||
return retVal;
|
return retVal;
|
||||||
|
|
|
@ -549,9 +549,7 @@ public class DataSourcesResource
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!tierDistinctSegments.containsKey(tier)) {
|
tierDistinctSegments.computeIfAbsent(tier, k -> new HashSet<>());
|
||||||
tierDistinctSegments.put(tier, new HashSet<>());
|
|
||||||
}
|
|
||||||
|
|
||||||
long dataSourceSegmentSize = 0;
|
long dataSourceSegmentSize = 0;
|
||||||
for (DataSegment dataSegment : druidDataSource.getSegments()) {
|
for (DataSegment dataSegment : druidDataSource.getSegments()) {
|
||||||
|
|
|
@ -2198,9 +2198,7 @@ public class CachingClusteredClientTest
|
||||||
serverExpectationList.add(serverExpectations);
|
serverExpectationList.add(serverExpectations);
|
||||||
for (int j = 0; j < numChunks; ++j) {
|
for (int j = 0; j < numChunks; ++j) {
|
||||||
DruidServer lastServer = servers[random.nextInt(servers.length)];
|
DruidServer lastServer = servers[random.nextInt(servers.length)];
|
||||||
if (!serverExpectations.containsKey(lastServer)) {
|
serverExpectations.computeIfAbsent(lastServer, server -> new ServerExpectations(lastServer, makeMock(mocks, QueryRunner.class)));
|
||||||
serverExpectations.put(lastServer, new ServerExpectations(lastServer, makeMock(mocks, QueryRunner.class)));
|
|
||||||
}
|
|
||||||
|
|
||||||
DataSegment mockSegment = makeMock(mocks, DataSegment.class);
|
DataSegment mockSegment = makeMock(mocks, DataSegment.class);
|
||||||
ServerExpectation<Object> expectation = new ServerExpectation<>(
|
ServerExpectation<Object> expectation = new ServerExpectation<>(
|
||||||
|
|
|
@ -428,9 +428,7 @@ public class StreamAppenderatorDriverTest extends EasyMockSupport
|
||||||
synchronized (counters) {
|
synchronized (counters) {
|
||||||
DateTime dateTimeTruncated = granularity.bucketStart(row.getTimestamp());
|
DateTime dateTimeTruncated = granularity.bucketStart(row.getTimestamp());
|
||||||
final long timestampTruncated = dateTimeTruncated.getMillis();
|
final long timestampTruncated = dateTimeTruncated.getMillis();
|
||||||
if (!counters.containsKey(timestampTruncated)) {
|
counters.putIfAbsent(timestampTruncated, new AtomicInteger());
|
||||||
counters.put(timestampTruncated, new AtomicInteger());
|
|
||||||
}
|
|
||||||
final int partitionNum = counters.get(timestampTruncated).getAndIncrement();
|
final int partitionNum = counters.get(timestampTruncated).getAndIncrement();
|
||||||
return new SegmentIdWithShardSpec(
|
return new SegmentIdWithShardSpec(
|
||||||
dataSource,
|
dataSource,
|
||||||
|
|
|
@ -122,9 +122,7 @@ public class SqlLifecycle
|
||||||
if (queryContext != null) {
|
if (queryContext != null) {
|
||||||
newContext.putAll(queryContext);
|
newContext.putAll(queryContext);
|
||||||
}
|
}
|
||||||
if (!newContext.containsKey(PlannerContext.CTX_SQL_QUERY_ID)) {
|
newContext.computeIfAbsent(PlannerContext.CTX_SQL_QUERY_ID, k -> UUID.randomUUID().toString());
|
||||||
newContext.put(PlannerContext.CTX_SQL_QUERY_ID, UUID.randomUUID().toString());
|
|
||||||
}
|
|
||||||
return newContext;
|
return newContext;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -78,9 +78,7 @@ public class SpecificSegmentsQuerySegmentWalker implements QuerySegmentWalker, C
|
||||||
)
|
)
|
||||||
{
|
{
|
||||||
final Segment segment = new QueryableIndexSegment(index, descriptor.getId());
|
final Segment segment = new QueryableIndexSegment(index, descriptor.getId());
|
||||||
if (!timelines.containsKey(descriptor.getDataSource())) {
|
timelines.computeIfAbsent(descriptor.getDataSource(), datasource -> new VersionedIntervalTimeline<>(Ordering.natural()));
|
||||||
timelines.put(descriptor.getDataSource(), new VersionedIntervalTimeline<>(Ordering.natural()));
|
|
||||||
}
|
|
||||||
|
|
||||||
final VersionedIntervalTimeline<String, Segment> timeline = timelines.get(descriptor.getDataSource());
|
final VersionedIntervalTimeline<String, Segment> timeline = timelines.get(descriptor.getDataSource());
|
||||||
timeline.add(descriptor.getInterval(), descriptor.getVersion(), descriptor.getShardSpec().createChunk(segment));
|
timeline.add(descriptor.getInterval(), descriptor.getVersion(), descriptor.getShardSpec().createChunk(segment));
|
||||||
|
|
Loading…
Reference in New Issue