From 3bee6adcf7338b501687f11ea866608e873c5b0a Mon Sep 17 00:00:00 2001 From: Sashidhar Thallam Date: Fri, 14 Jun 2019 21:29:36 +0530 Subject: [PATCH] Use map.putIfAbsent() or map.computeIfAbsent() as appropriate instead of containsKey() + put() (#7764) * https://github.com/apache/incubator-druid/issues/7316 Use Map.putIfAbsent() instead of containsKey() + put() * fixing indentation * Using map.computeIfAbsent() instead of map.putIfAbsent() where appropriate * fixing checkstyle * Changing the recommendation text * Reverting auto changes made by IDE * Implementing recommendation: A ConcurrentHashMap on which computeIfAbsent() is called should be assigned into variables of ConcurrentHashMap type, not ConcurrentMap * Removing unused import --- .idea/inspectionProfiles/Druid.xml | 7 +- .../storage/s3/S3DataSegmentMoverTest.java | 4 +- .../indexer/DetermineHashedPartitionsJob.java | 4 +- .../ActionBasedUsedSegmentChecker.java | 5 +- .../druid/indexing/common/task/IndexTask.java | 8 +- .../IngestSegmentFirehoseFactory.java | 147 ++--- .../indexing/overlord/ForkingTaskRunner.java | 568 +++++++++--------- .../SQLMetadataSupervisorManager.java | 4 +- .../server/http/DataSourcesResource.java | 4 +- .../client/CachingClusteredClientTest.java | 4 +- .../StreamAppenderatorDriverTest.java | 4 +- .../org/apache/druid/sql/SqlLifecycle.java | 4 +- .../SpecificSegmentsQuerySegmentWalker.java | 4 +- 13 files changed, 374 insertions(+), 393 deletions(-) diff --git a/.idea/inspectionProfiles/Druid.xml b/.idea/inspectionProfiles/Druid.xml index 9770c11edbf..ed890c86ec7 100644 --- a/.idea/inspectionProfiles/Druid.xml +++ b/.idea/inspectionProfiles/Druid.xml @@ -306,6 +306,11 @@ + + + + + @@ -400,4 +405,4 @@ - \ No newline at end of file + diff --git a/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3DataSegmentMoverTest.java b/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3DataSegmentMoverTest.java index ec30aa93508..34496d965d6 100644 --- a/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3DataSegmentMoverTest.java +++ b/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3DataSegmentMoverTest.java @@ -261,9 +261,7 @@ public class S3DataSegmentMoverTest @Override public PutObjectResult putObject(String bucketName, String key, File file) { - if (!storage.containsKey(bucketName)) { - storage.put(bucketName, new HashSet<>()); - } + storage.putIfAbsent(bucketName, new HashSet<>()); storage.get(bucketName).add(key); return new PutObjectResult(); } diff --git a/indexing-hadoop/src/main/java/org/apache/druid/indexer/DetermineHashedPartitionsJob.java b/indexing-hadoop/src/main/java/org/apache/druid/indexer/DetermineHashedPartitionsJob.java index c83bc085a2f..17f51723fba 100644 --- a/indexing-hadoop/src/main/java/org/apache/druid/indexer/DetermineHashedPartitionsJob.java +++ b/indexing-hadoop/src/main/java/org/apache/druid/indexer/DetermineHashedPartitionsJob.java @@ -307,9 +307,7 @@ public class DetermineHashedPartitionsJob implements Jobby .getSegmentGranularity() .bucket(DateTimes.utc(inputRow.getTimestampFromEpoch())); - if (!hyperLogLogs.containsKey(interval)) { - hyperLogLogs.put(interval, HyperLogLogCollector.makeLatestCollector()); - } + hyperLogLogs.computeIfAbsent(interval, intv -> HyperLogLogCollector.makeLatestCollector()); } else { final Optional maybeInterval = config.getGranularitySpec() .bucketInterval(DateTimes.utc(inputRow.getTimestampFromEpoch())); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/appenderator/ActionBasedUsedSegmentChecker.java b/indexing-service/src/main/java/org/apache/druid/indexing/appenderator/ActionBasedUsedSegmentChecker.java index a1eb90f88e7..96ce6aeae6d 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/appenderator/ActionBasedUsedSegmentChecker.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/appenderator/ActionBasedUsedSegmentChecker.java @@ -50,9 +50,8 @@ public class ActionBasedUsedSegmentChecker implements UsedSegmentChecker // Group by dataSource final Map> identifiersByDataSource = new TreeMap<>(); for (SegmentIdWithShardSpec identifier : identifiers) { - if (!identifiersByDataSource.containsKey(identifier.getDataSource())) { - identifiersByDataSource.put(identifier.getDataSource(), new HashSet<>()); - } + identifiersByDataSource.computeIfAbsent(identifier.getDataSource(), k -> new HashSet<>()); + identifiersByDataSource.get(identifier.getDataSource()).add(identifier); } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java index b7e512393bf..458c621d396 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java @@ -768,9 +768,7 @@ public class IndexTask extends AbstractTask implements ChatHandler } if (determineNumPartitions) { - if (!hllCollectors.containsKey(interval)) { - hllCollectors.put(interval, Optional.of(HyperLogLogCollector.makeLatestCollector())); - } + hllCollectors.computeIfAbsent(interval, intv -> Optional.of(HyperLogLogCollector.makeLatestCollector())); List groupKey = Rows.toGroupKey( queryGranularity.bucketStart(inputRow.getTimestamp()).getMillis(), @@ -781,9 +779,7 @@ public class IndexTask extends AbstractTask implements ChatHandler } else { // we don't need to determine partitions but we still need to determine intervals, so add an Optional.absent() // for the interval and don't instantiate a HLL collector - if (!hllCollectors.containsKey(interval)) { - hllCollectors.put(interval, Optional.absent()); - } + hllCollectors.putIfAbsent(interval, Optional.absent()); } determinePartitionsMeters.incrementProcessed(); } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactory.java b/indexing-service/src/main/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactory.java index 4d1e0bf4e8b..2caf91dca99 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactory.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactory.java @@ -204,87 +204,87 @@ public class IngestSegmentFirehoseFactory implements FiniteFirehoseFactory> timeLineSegments = getTimeline(); + final List> timeLineSegments = getTimeline(); - // Download all segments locally. - // Note: this requires enough local storage space to fit all of the segments, even though - // IngestSegmentFirehose iterates over the segments in series. We may want to change this - // to download files lazily, perhaps sharing code with PrefetchableTextFilesFirehoseFactory. - final SegmentLoader segmentLoader = segmentLoaderFactory.manufacturate(temporaryDirectory); - Map segmentFileMap = Maps.newLinkedHashMap(); - for (TimelineObjectHolder holder : timeLineSegments) { - for (PartitionChunk chunk : holder.getObject()) { - final DataSegment segment = chunk.getObject(); - if (!segmentFileMap.containsKey(segment)) { - segmentFileMap.put(segment, segmentLoader.getSegmentFiles(segment)); + // Download all segments locally. + // Note: this requires enough local storage space to fit all of the segments, even though + // IngestSegmentFirehose iterates over the segments in series. We may want to change this + // to download files lazily, perhaps sharing code with PrefetchableTextFilesFirehoseFactory. + final SegmentLoader segmentLoader = segmentLoaderFactory.manufacturate(temporaryDirectory); + Map segmentFileMap = Maps.newLinkedHashMap(); + for (TimelineObjectHolder holder : timeLineSegments) { + for (PartitionChunk chunk : holder.getObject()) { + final DataSegment segment = chunk.getObject(); + + segmentFileMap.computeIfAbsent(segment, k -> { + try { + return segmentLoader.getSegmentFiles(segment); } - } + catch (SegmentLoadingException e) { + throw new RuntimeException(e); + } + }); + } + } - final List dims; - if (dimensions != null) { - dims = dimensions; - } else if (inputRowParser.getParseSpec().getDimensionsSpec().hasCustomDimensions()) { - dims = inputRowParser.getParseSpec().getDimensionsSpec().getDimensionNames(); - } else { - dims = getUniqueDimensions( - timeLineSegments, - inputRowParser.getParseSpec().getDimensionsSpec().getDimensionExclusions() - ); - } + final List dims; + if (dimensions != null) { + dims = dimensions; + } else if (inputRowParser.getParseSpec().getDimensionsSpec().hasCustomDimensions()) { + dims = inputRowParser.getParseSpec().getDimensionsSpec().getDimensionNames(); + } else { + dims = getUniqueDimensions( + timeLineSegments, + inputRowParser.getParseSpec().getDimensionsSpec().getDimensionExclusions() + ); + } - final List metricsList = metrics == null ? getUniqueMetrics(timeLineSegments) : metrics; + final List metricsList = metrics == null ? getUniqueMetrics(timeLineSegments) : metrics; - final List adapters = Lists.newArrayList( - Iterables.concat( - Iterables.transform( - timeLineSegments, - new Function, Iterable>() - { + final List adapters = Lists.newArrayList( + Iterables.concat( + Iterables.transform( + timeLineSegments, + new Function, Iterable>() { + @Override + public Iterable apply(final TimelineObjectHolder holder) + { + return + Iterables.transform( + holder.getObject(), + new Function, WindowedStorageAdapter>() { @Override - public Iterable apply(final TimelineObjectHolder holder) + public WindowedStorageAdapter apply(final PartitionChunk input) { - return - Iterables.transform( - holder.getObject(), - new Function, WindowedStorageAdapter>() - { - @Override - public WindowedStorageAdapter apply(final PartitionChunk input) - { - final DataSegment segment = input.getObject(); - try { - return new WindowedStorageAdapter( - new QueryableIndexStorageAdapter( - indexIO.loadIndex( - Preconditions.checkNotNull( - segmentFileMap.get(segment), - "File for segment %s", segment.getId() - ) - ) - ), - holder.getInterval() - ); - } - catch (IOException e) { - throw new RuntimeException(e); - } - } - } - ); + final DataSegment segment = input.getObject(); + try { + return new WindowedStorageAdapter( + new QueryableIndexStorageAdapter( + indexIO.loadIndex( + Preconditions.checkNotNull( + segmentFileMap.get(segment), + "File for segment %s", segment.getId() + ) + ) + ), + holder.getInterval() + ); + } + catch (IOException e) { + throw new RuntimeException(e); + } } } - ) - ) - ); + ); + } + } + ) + ) + ); - final TransformSpec transformSpec = TransformSpec.fromInputRowParser(inputRowParser); - return new IngestSegmentFirehose(adapters, transformSpec, dims, metricsList, dimFilter); - } - catch (SegmentLoadingException e) { - throw new RuntimeException(e); - } + final TransformSpec transformSpec = TransformSpec.fromInputRowParser(inputRowParser); + return new IngestSegmentFirehose(adapters, transformSpec, dims, metricsList, dimFilter); } private long jitter(long input) @@ -508,13 +508,14 @@ public class IngestSegmentFirehoseFactory implements FiniteFirehoseFactory timelineHolder : Lists.reverse(timelineSegments)) { for (PartitionChunk chunk : timelineHolder.getObject()) { for (String metric : chunk.getObject().getMetrics()) { - if (!uniqueMetrics.containsKey(metric)) { - uniqueMetrics.put(metric, index++); + uniqueMetrics.computeIfAbsent(metric, k -> { + return index[0]++; } + ); } } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java index 05f2f52c389..116747b9d7b 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java @@ -84,7 +84,6 @@ import java.util.Set; import java.util.UUID; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; @@ -108,7 +107,7 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogStreamer private final CopyOnWriteArrayList> listeners = new CopyOnWriteArrayList<>(); /** Writes must be synchronized. This is only a ConcurrentMap so "informational" reads can occur without waiting. */ - private final ConcurrentMap tasks = new ConcurrentHashMap<>(); + private final ConcurrentHashMap tasks = new ConcurrentHashMap<>(); private volatile boolean stopping = false; @@ -214,309 +213,306 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogStreamer public ListenableFuture run(final Task task) { synchronized (tasks) { - if (!tasks.containsKey(task.getId())) { - tasks.put( - task.getId(), - new ForkingTaskRunnerWorkItem( - task, - exec.submit( - new Callable() - { - @Override - public TaskStatus call() - { - final String attemptUUID = UUID.randomUUID().toString(); - final File taskDir = taskConfig.getTaskDir(task.getId()); - final File attemptDir = new File(taskDir, attemptUUID); + tasks.computeIfAbsent( + task.getId(), k -> + new ForkingTaskRunnerWorkItem( + task, + exec.submit( + new Callable() { + @Override + public TaskStatus call() + { + final String attemptUUID = UUID.randomUUID().toString(); + final File taskDir = taskConfig.getTaskDir(task.getId()); + final File attemptDir = new File(taskDir, attemptUUID); - final ProcessHolder processHolder; - final String childHost = node.getHost(); - int childPort = -1; - int tlsChildPort = -1; + final ProcessHolder processHolder; + final String childHost = node.getHost(); + int childPort = -1; + int tlsChildPort = -1; - if (node.isEnablePlaintextPort()) { - childPort = portFinder.findUnusedPort(); + if (node.isEnablePlaintextPort()) { + childPort = portFinder.findUnusedPort(); + } + + if (node.isEnableTlsPort()) { + tlsChildPort = portFinder.findUnusedPort(); + } + + final TaskLocation taskLocation = TaskLocation.create(childHost, childPort, tlsChildPort); + + try { + final Closer closer = Closer.create(); + try { + if (!attemptDir.mkdirs()) { + throw new IOE("Could not create directories: %s", attemptDir); + } + + final File taskFile = new File(taskDir, "task.json"); + final File statusFile = new File(attemptDir, "status.json"); + final File logFile = new File(taskDir, "log"); + final File reportsFile = new File(attemptDir, "report.json"); + + // time to adjust process holders + synchronized (tasks) { + final ForkingTaskRunnerWorkItem taskWorkItem = tasks.get(task.getId()); + + if (taskWorkItem.shutdown) { + throw new IllegalStateException("Task has been shut down!"); } - if (node.isEnableTlsPort()) { - tlsChildPort = portFinder.findUnusedPort(); + if (taskWorkItem == null) { + log.makeAlert("WTF?! TaskInfo disappeared!").addData("task", task.getId()).emit(); + throw new ISE("TaskInfo disappeared for task[%s]!", task.getId()); } - final TaskLocation taskLocation = TaskLocation.create(childHost, childPort, tlsChildPort); + if (taskWorkItem.processHolder != null) { + log.makeAlert("WTF?! TaskInfo already has a processHolder") + .addData("task", task.getId()) + .emit(); + throw new ISE("TaskInfo already has processHolder for task[%s]!", task.getId()); + } - try { - final Closer closer = Closer.create(); - try { - if (!attemptDir.mkdirs()) { - throw new IOE("Could not create directories: %s", attemptDir); + final List command = new ArrayList<>(); + final String taskClasspath; + if (task.getClasspathPrefix() != null && !task.getClasspathPrefix().isEmpty()) { + taskClasspath = Joiner.on(File.pathSeparator).join( + task.getClasspathPrefix(), + config.getClasspath() + ); + } else { + taskClasspath = config.getClasspath(); + } + + command.add(config.getJavaCommand()); + command.add("-cp"); + command.add(taskClasspath); + + Iterables.addAll(command, new QuotableWhiteSpaceSplitter(config.getJavaOpts())); + Iterables.addAll(command, config.getJavaOptsArray()); + + // Override task specific javaOpts + Object taskJavaOpts = task.getContextValue( + ForkingTaskRunnerConfig.JAVA_OPTS_PROPERTY + ); + if (taskJavaOpts != null) { + Iterables.addAll( + command, + new QuotableWhiteSpaceSplitter((String) taskJavaOpts) + ); + } + + for (String propName : props.stringPropertyNames()) { + for (String allowedPrefix : config.getAllowedPrefixes()) { + // See https://github.com/apache/incubator-druid/issues/1841 + if (propName.startsWith(allowedPrefix) + && !ForkingTaskRunnerConfig.JAVA_OPTS_PROPERTY.equals(propName) + && !ForkingTaskRunnerConfig.JAVA_OPTS_ARRAY_PROPERTY.equals(propName) + ) { + command.add( + StringUtils.format( + "-D%s=%s", + propName, + props.getProperty(propName) + ) + ); } + } + } - final File taskFile = new File(taskDir, "task.json"); - final File statusFile = new File(attemptDir, "status.json"); - final File logFile = new File(taskDir, "log"); - final File reportsFile = new File(attemptDir, "report.json"); - - // time to adjust process holders - synchronized (tasks) { - final ForkingTaskRunnerWorkItem taskWorkItem = tasks.get(task.getId()); - - if (taskWorkItem.shutdown) { - throw new IllegalStateException("Task has been shut down!"); - } - - if (taskWorkItem == null) { - log.makeAlert("WTF?! TaskInfo disappeared!").addData("task", task.getId()).emit(); - throw new ISE("TaskInfo disappeared for task[%s]!", task.getId()); - } - - if (taskWorkItem.processHolder != null) { - log.makeAlert("WTF?! TaskInfo already has a processHolder") - .addData("task", task.getId()) - .emit(); - throw new ISE("TaskInfo already has processHolder for task[%s]!", task.getId()); - } - - final List command = new ArrayList<>(); - final String taskClasspath; - if (task.getClasspathPrefix() != null && !task.getClasspathPrefix().isEmpty()) { - taskClasspath = Joiner.on(File.pathSeparator).join( - task.getClasspathPrefix(), - config.getClasspath() - ); - } else { - taskClasspath = config.getClasspath(); - } - - command.add(config.getJavaCommand()); - command.add("-cp"); - command.add(taskClasspath); - - Iterables.addAll(command, new QuotableWhiteSpaceSplitter(config.getJavaOpts())); - Iterables.addAll(command, config.getJavaOptsArray()); - - // Override task specific javaOpts - Object taskJavaOpts = task.getContextValue( - ForkingTaskRunnerConfig.JAVA_OPTS_PROPERTY - ); - if (taskJavaOpts != null) { - Iterables.addAll( - command, - new QuotableWhiteSpaceSplitter((String) taskJavaOpts) - ); - } - - for (String propName : props.stringPropertyNames()) { - for (String allowedPrefix : config.getAllowedPrefixes()) { - // See https://github.com/apache/incubator-druid/issues/1841 - if (propName.startsWith(allowedPrefix) - && !ForkingTaskRunnerConfig.JAVA_OPTS_PROPERTY.equals(propName) - && !ForkingTaskRunnerConfig.JAVA_OPTS_ARRAY_PROPERTY.equals(propName) - ) { - command.add( - StringUtils.format( - "-D%s=%s", - propName, - props.getProperty(propName) - ) - ); - } - } - } - - // Override child JVM specific properties - for (String propName : props.stringPropertyNames()) { - if (propName.startsWith(CHILD_PROPERTY_PREFIX)) { - command.add( - StringUtils.format( - "-D%s=%s", - propName.substring(CHILD_PROPERTY_PREFIX.length()), - props.getProperty(propName) - ) - ); - } - } - - // Override task specific properties - final Map context = task.getContext(); - if (context != null) { - for (String propName : context.keySet()) { - if (propName.startsWith(CHILD_PROPERTY_PREFIX)) { - command.add( - StringUtils.format( - "-D%s=%s", - propName.substring(CHILD_PROPERTY_PREFIX.length()), - task.getContextValue(propName) - ) - ); - } - } - } - - // Add dataSource, taskId and taskType for metrics or logging - command.add( - StringUtils.format( - "-D%s%s=%s", - MonitorsConfig.METRIC_DIMENSION_PREFIX, - DruidMetrics.DATASOURCE, - task.getDataSource() - ) - ); - command.add( - StringUtils.format( - "-D%s%s=%s", - MonitorsConfig.METRIC_DIMENSION_PREFIX, - DruidMetrics.TASK_ID, - task.getId() - ) - ); - command.add( - StringUtils.format( - "-D%s%s=%s", - MonitorsConfig.METRIC_DIMENSION_PREFIX, - DruidMetrics.TASK_TYPE, - task.getType() - ) - ); - - command.add(StringUtils.format("-Ddruid.host=%s", childHost)); - command.add(StringUtils.format("-Ddruid.plaintextPort=%d", childPort)); - command.add(StringUtils.format("-Ddruid.tlsPort=%d", tlsChildPort)); - /** - * These are not enabled per default to allow the user to either set or not set them - * Users are highly suggested to be set in druid.indexer.runner.javaOpts - * See org.apache.druid.concurrent.TaskThreadPriority#getThreadPriorityFromTaskPriority(int) - * for more information - command.add("-XX:+UseThreadPriorities"); - command.add("-XX:ThreadPriorityPolicy=42"); - */ - - command.add("org.apache.druid.cli.Main"); - command.add("internal"); - command.add("peon"); - command.add(taskFile.toString()); - command.add(statusFile.toString()); - command.add(reportsFile.toString()); - String nodeType = task.getNodeType(); - if (nodeType != null) { - command.add("--nodeType"); - command.add(nodeType); - } - - if (!taskFile.exists()) { - jsonMapper.writeValue(taskFile, task); - } - - log.info("Running command: %s", Joiner.on(" ").join(command)); - taskWorkItem.processHolder = new ProcessHolder( - new ProcessBuilder(ImmutableList.copyOf(command)).redirectErrorStream(true).start(), - logFile, - taskLocation.getHost(), - taskLocation.getPort(), - taskLocation.getTlsPort() - ); - - processHolder = taskWorkItem.processHolder; - processHolder.registerWithCloser(closer); - } - - TaskRunnerUtils.notifyLocationChanged(listeners, task.getId(), taskLocation); - TaskRunnerUtils.notifyStatusChanged( - listeners, - task.getId(), - TaskStatus.running(task.getId()) + // Override child JVM specific properties + for (String propName : props.stringPropertyNames()) { + if (propName.startsWith(CHILD_PROPERTY_PREFIX)) { + command.add( + StringUtils.format( + "-D%s=%s", + propName.substring(CHILD_PROPERTY_PREFIX.length()), + props.getProperty(propName) + ) ); - - log.info("Logging task %s output to: %s", task.getId(), logFile); - boolean runFailed = true; - - final ByteSink logSink = Files.asByteSink(logFile, FileWriteMode.APPEND); - - // This will block for a while. So we append the thread information with more details - final String priorThreadName = Thread.currentThread().getName(); - Thread.currentThread().setName(StringUtils.format("%s-[%s]", priorThreadName, task.getId())); - - try (final OutputStream toLogfile = logSink.openStream()) { - ByteStreams.copy(processHolder.process.getInputStream(), toLogfile); - final int statusCode = processHolder.process.waitFor(); - log.info("Process exited with status[%d] for task: %s", statusCode, task.getId()); - if (statusCode == 0) { - runFailed = false; - } - } - finally { - Thread.currentThread().setName(priorThreadName); - // Upload task logs - taskLogPusher.pushTaskLog(task.getId(), logFile); - if (reportsFile.exists()) { - taskLogPusher.pushTaskReports(task.getId(), reportsFile); - } - } - - TaskStatus status; - if (!runFailed) { - // Process exited successfully - status = jsonMapper.readValue(statusFile, TaskStatus.class); - } else { - // Process exited unsuccessfully - status = TaskStatus.failure(task.getId()); - } - - TaskRunnerUtils.notifyStatusChanged(listeners, task.getId(), status); - return status; - } - catch (Throwable t) { - throw closer.rethrow(t); - } - finally { - closer.close(); } } - catch (Throwable t) { - log.info(t, "Exception caught during execution"); - throw new RuntimeException(t); + + // Override task specific properties + final Map context = task.getContext(); + if (context != null) { + for (String propName : context.keySet()) { + if (propName.startsWith(CHILD_PROPERTY_PREFIX)) { + command.add( + StringUtils.format( + "-D%s=%s", + propName.substring(CHILD_PROPERTY_PREFIX.length()), + task.getContextValue(propName) + ) + ); + } + } } - finally { - try { - synchronized (tasks) { - final ForkingTaskRunnerWorkItem taskWorkItem = tasks.remove(task.getId()); - if (taskWorkItem != null && taskWorkItem.processHolder != null) { - taskWorkItem.processHolder.process.destroy(); - } - if (!stopping) { - saveRunningTasks(); - } - } - if (node.isEnablePlaintextPort()) { - portFinder.markPortUnused(childPort); - } - if (node.isEnableTlsPort()) { - portFinder.markPortUnused(tlsChildPort); - } + // Add dataSource, taskId and taskType for metrics or logging + command.add( + StringUtils.format( + "-D%s%s=%s", + MonitorsConfig.METRIC_DIMENSION_PREFIX, + DruidMetrics.DATASOURCE, + task.getDataSource() + ) + ); + command.add( + StringUtils.format( + "-D%s%s=%s", + MonitorsConfig.METRIC_DIMENSION_PREFIX, + DruidMetrics.TASK_ID, + task.getId() + ) + ); + command.add( + StringUtils.format( + "-D%s%s=%s", + MonitorsConfig.METRIC_DIMENSION_PREFIX, + DruidMetrics.TASK_TYPE, + task.getType() + ) + ); - try { - if (!stopping && taskDir.exists()) { - log.info("Removing task directory: %s", taskDir); - FileUtils.deleteDirectory(taskDir); - } - } - catch (Exception e) { - log.makeAlert(e, "Failed to delete task directory") - .addData("taskDir", taskDir.toString()) - .addData("task", task.getId()) - .emit(); - } - } - catch (Exception e) { - log.error(e, "Suppressing exception caught while cleaning up task"); - } + command.add(StringUtils.format("-Ddruid.host=%s", childHost)); + command.add(StringUtils.format("-Ddruid.plaintextPort=%d", childPort)); + command.add(StringUtils.format("-Ddruid.tlsPort=%d", tlsChildPort)); + /** + * These are not enabled per default to allow the user to either set or not set them + * Users are highly suggested to be set in druid.indexer.runner.javaOpts + * See org.apache.druid.concurrent.TaskThreadPriority#getThreadPriorityFromTaskPriority(int) + * for more information + command.add("-XX:+UseThreadPriorities"); + command.add("-XX:ThreadPriorityPolicy=42"); + */ + + command.add("org.apache.druid.cli.Main"); + command.add("internal"); + command.add("peon"); + command.add(taskFile.toString()); + command.add(statusFile.toString()); + command.add(reportsFile.toString()); + String nodeType = task.getNodeType(); + if (nodeType != null) { + command.add("--nodeType"); + command.add(nodeType); + } + + if (!taskFile.exists()) { + jsonMapper.writeValue(taskFile, task); + } + + log.info("Running command: %s", Joiner.on(" ").join(command)); + taskWorkItem.processHolder = new ProcessHolder( + new ProcessBuilder(ImmutableList.copyOf(command)).redirectErrorStream(true).start(), + logFile, + taskLocation.getHost(), + taskLocation.getPort(), + taskLocation.getTlsPort() + ); + + processHolder = taskWorkItem.processHolder; + processHolder.registerWithCloser(closer); + } + + TaskRunnerUtils.notifyLocationChanged(listeners, task.getId(), taskLocation); + TaskRunnerUtils.notifyStatusChanged( + listeners, + task.getId(), + TaskStatus.running(task.getId()) + ); + + log.info("Logging task %s output to: %s", task.getId(), logFile); + boolean runFailed = true; + + final ByteSink logSink = Files.asByteSink(logFile, FileWriteMode.APPEND); + + // This will block for a while. So we append the thread information with more details + final String priorThreadName = Thread.currentThread().getName(); + Thread.currentThread().setName(StringUtils.format("%s-[%s]", priorThreadName, task.getId())); + + try (final OutputStream toLogfile = logSink.openStream()) { + ByteStreams.copy(processHolder.process.getInputStream(), toLogfile); + final int statusCode = processHolder.process.waitFor(); + log.info("Process exited with status[%d] for task: %s", statusCode, task.getId()); + if (statusCode == 0) { + runFailed = false; } } + finally { + Thread.currentThread().setName(priorThreadName); + // Upload task logs + taskLogPusher.pushTaskLog(task.getId(), logFile); + if (reportsFile.exists()) { + taskLogPusher.pushTaskReports(task.getId(), reportsFile); + } + } + + TaskStatus status; + if (!runFailed) { + // Process exited successfully + status = jsonMapper.readValue(statusFile, TaskStatus.class); + } else { + // Process exited unsuccessfully + status = TaskStatus.failure(task.getId()); + } + + TaskRunnerUtils.notifyStatusChanged(listeners, task.getId(), status); + return status; } - ) + catch (Throwable t) { + throw closer.rethrow(t); + } + finally { + closer.close(); + } + } + catch (Throwable t) { + log.info(t, "Exception caught during execution"); + throw new RuntimeException(t); + } + finally { + try { + synchronized (tasks) { + final ForkingTaskRunnerWorkItem taskWorkItem = tasks.remove(task.getId()); + if (taskWorkItem != null && taskWorkItem.processHolder != null) { + taskWorkItem.processHolder.process.destroy(); + } + if (!stopping) { + saveRunningTasks(); + } + } + + if (node.isEnablePlaintextPort()) { + portFinder.markPortUnused(childPort); + } + if (node.isEnableTlsPort()) { + portFinder.markPortUnused(tlsChildPort); + } + + try { + if (!stopping && taskDir.exists()) { + log.info("Removing task directory: %s", taskDir); + FileUtils.deleteDirectory(taskDir); + } + } + catch (Exception e) { + log.makeAlert(e, "Failed to delete task directory") + .addData("taskDir", taskDir.toString()) + .addData("task", task.getId()) + .emit(); + } + } + catch (Exception e) { + log.error(e, "Suppressing exception caught while cleaning up task"); + } + } + } + } ) - ); - } + ) + ); saveRunningTasks(); return tasks.get(task.getId()).getResult(); } diff --git a/server/src/main/java/org/apache/druid/metadata/SQLMetadataSupervisorManager.java b/server/src/main/java/org/apache/druid/metadata/SQLMetadataSupervisorManager.java index 83fcc0505c5..a5354b1990b 100644 --- a/server/src/main/java/org/apache/druid/metadata/SQLMetadataSupervisorManager.java +++ b/server/src/main/java/org/apache/druid/metadata/SQLMetadataSupervisorManager.java @@ -166,9 +166,7 @@ public class SQLMetadataSupervisorManager implements MetadataSupervisorManager { try { String specId = pair.lhs; - if (!retVal.containsKey(specId)) { - retVal.put(specId, new ArrayList<>()); - } + retVal.putIfAbsent(specId, new ArrayList<>()); retVal.get(specId).add(pair.rhs); return retVal; diff --git a/server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java b/server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java index 3787cc6b814..9255e7c5145 100644 --- a/server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java +++ b/server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java @@ -549,9 +549,7 @@ public class DataSourcesResource continue; } - if (!tierDistinctSegments.containsKey(tier)) { - tierDistinctSegments.put(tier, new HashSet<>()); - } + tierDistinctSegments.computeIfAbsent(tier, k -> new HashSet<>()); long dataSourceSegmentSize = 0; for (DataSegment dataSegment : druidDataSource.getSegments()) { diff --git a/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java b/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java index de24926a3ab..8170014c3e6 100644 --- a/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java +++ b/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java @@ -2198,9 +2198,7 @@ public class CachingClusteredClientTest serverExpectationList.add(serverExpectations); for (int j = 0; j < numChunks; ++j) { DruidServer lastServer = servers[random.nextInt(servers.length)]; - if (!serverExpectations.containsKey(lastServer)) { - serverExpectations.put(lastServer, new ServerExpectations(lastServer, makeMock(mocks, QueryRunner.class))); - } + serverExpectations.computeIfAbsent(lastServer, server -> new ServerExpectations(lastServer, makeMock(mocks, QueryRunner.class))); DataSegment mockSegment = makeMock(mocks, DataSegment.class); ServerExpectation expectation = new ServerExpectation<>( diff --git a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverTest.java b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverTest.java index 9c3e9ccbe4c..c7475d9f722 100644 --- a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverTest.java +++ b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverTest.java @@ -428,9 +428,7 @@ public class StreamAppenderatorDriverTest extends EasyMockSupport synchronized (counters) { DateTime dateTimeTruncated = granularity.bucketStart(row.getTimestamp()); final long timestampTruncated = dateTimeTruncated.getMillis(); - if (!counters.containsKey(timestampTruncated)) { - counters.put(timestampTruncated, new AtomicInteger()); - } + counters.putIfAbsent(timestampTruncated, new AtomicInteger()); final int partitionNum = counters.get(timestampTruncated).getAndIncrement(); return new SegmentIdWithShardSpec( dataSource, diff --git a/sql/src/main/java/org/apache/druid/sql/SqlLifecycle.java b/sql/src/main/java/org/apache/druid/sql/SqlLifecycle.java index a9c78c48f6f..80cfd0434a4 100644 --- a/sql/src/main/java/org/apache/druid/sql/SqlLifecycle.java +++ b/sql/src/main/java/org/apache/druid/sql/SqlLifecycle.java @@ -122,9 +122,7 @@ public class SqlLifecycle if (queryContext != null) { newContext.putAll(queryContext); } - if (!newContext.containsKey(PlannerContext.CTX_SQL_QUERY_ID)) { - newContext.put(PlannerContext.CTX_SQL_QUERY_ID, UUID.randomUUID().toString()); - } + newContext.computeIfAbsent(PlannerContext.CTX_SQL_QUERY_ID, k -> UUID.randomUUID().toString()); return newContext; } diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/util/SpecificSegmentsQuerySegmentWalker.java b/sql/src/test/java/org/apache/druid/sql/calcite/util/SpecificSegmentsQuerySegmentWalker.java index af0e8945432..4d1206b18b7 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/util/SpecificSegmentsQuerySegmentWalker.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/util/SpecificSegmentsQuerySegmentWalker.java @@ -78,9 +78,7 @@ public class SpecificSegmentsQuerySegmentWalker implements QuerySegmentWalker, C ) { final Segment segment = new QueryableIndexSegment(index, descriptor.getId()); - if (!timelines.containsKey(descriptor.getDataSource())) { - timelines.put(descriptor.getDataSource(), new VersionedIntervalTimeline<>(Ordering.natural())); - } + timelines.computeIfAbsent(descriptor.getDataSource(), datasource -> new VersionedIntervalTimeline<>(Ordering.natural())); final VersionedIntervalTimeline timeline = timelines.get(descriptor.getDataSource()); timeline.add(descriptor.getInterval(), descriptor.getVersion(), descriptor.getShardSpec().createChunk(segment));