From 723f7ac55095829c57f8d8a8a03aaca97adc2a49 Mon Sep 17 00:00:00 2001 From: Jonathan Wei Date: Mon, 2 Apr 2018 12:10:56 -0700 Subject: [PATCH] Add support for task reports, upload reports to deep storage (#5524) * Add support for task reports, upload reports to deep storage * PR comments * Better name for method * Fix report file upload * Use TaskReportFileWriter * Checkstyle * More PR comments --- .../java/io/druid/tasklogs/NoopTaskLogs.java | 7 +++ .../java/io/druid/tasklogs/TaskLogPusher.java | 4 ++ .../io/druid/tasklogs/TaskLogStreamer.java | 5 ++ .../io/druid/storage/azure/AzureTaskLogs.java | 30 +++++++++- .../druid/storage/google/GoogleTaskLogs.java | 28 +++++++++ .../storage/hdfs/tasklog/HdfsTaskLogs.java | 38 +++++++++++- .../druid/indexing/kafka/KafkaIndexTask.java | 2 + .../indexing/kafka/KafkaIndexTaskTest.java | 4 +- .../java/io/druid/storage/s3/S3TaskLogs.java | 31 ++++++++-- .../io/druid/indexing/common/TaskReport.java | 54 +++++++++++++++++ .../indexing/common/TaskReportFileWriter.java | 58 +++++++++++++++++++ .../io/druid/indexing/common/TaskToolbox.java | 11 +++- .../indexing/common/TaskToolboxFactory.java | 8 ++- .../AppenderatorDriverRealtimeIndexTask.java | 1 + .../indexing/common/task/HadoopIndexTask.java | 3 + .../druid/indexing/common/task/IndexTask.java | 2 + .../common/tasklogs/FileTaskLogs.java | 40 +++++++++++-- .../tasklogs/SwitchingTaskLogStreamer.java | 13 +++++ .../indexing/overlord/ForkingTaskRunner.java | 5 ++ .../overlord/http/OverlordResource.java | 27 +++++++++ .../worker/executor/ExecutorLifecycle.java | 3 +- .../indexing/common/TaskToolboxTest.java | 4 +- ...penderatorDriverRealtimeIndexTaskTest.java | 3 +- .../common/task/CompactionTaskTest.java | 3 +- .../indexing/common/task/IndexTaskTest.java | 3 +- .../common/task/NoopTestTaskFileWriter.java | 36 ++++++++++++ .../common/task/RealtimeIndexTaskTest.java | 3 +- .../task/SameIntervalMergeTaskTest.java | 3 +- .../IngestSegmentFirehoseFactoryTest.java | 4 +- ...estSegmentFirehoseFactoryTimelineTest.java | 4 +- .../indexing/overlord/TaskLifecycleTest.java | 4 +- .../worker/WorkerTaskManagerTest.java | 4 +- .../worker/WorkerTaskMonitorTest.java | 4 +- .../src/main/java/io/druid/cli/CliPeon.java | 26 ++++++++- 34 files changed, 444 insertions(+), 31 deletions(-) create mode 100644 indexing-service/src/main/java/io/druid/indexing/common/TaskReport.java create mode 100644 indexing-service/src/main/java/io/druid/indexing/common/TaskReportFileWriter.java create mode 100644 indexing-service/src/test/java/io/druid/indexing/common/task/NoopTestTaskFileWriter.java diff --git a/api/src/main/java/io/druid/tasklogs/NoopTaskLogs.java b/api/src/main/java/io/druid/tasklogs/NoopTaskLogs.java index 6fb0f309cfe..d54c63cce18 100644 --- a/api/src/main/java/io/druid/tasklogs/NoopTaskLogs.java +++ b/api/src/main/java/io/druid/tasklogs/NoopTaskLogs.java @@ -24,6 +24,7 @@ import com.google.common.io.ByteSource; import io.druid.java.util.common.logger.Logger; import java.io.File; +import java.io.IOException; public class NoopTaskLogs implements TaskLogs { @@ -41,6 +42,12 @@ public class NoopTaskLogs implements TaskLogs log.info("Not pushing logs for task: %s", taskid); } + @Override + public void pushTaskReports(String taskid, File reportFile) throws IOException + { + log.info("Not pushing reports for task: %s", taskid); + } + @Override public void killAll() { diff --git a/api/src/main/java/io/druid/tasklogs/TaskLogPusher.java b/api/src/main/java/io/druid/tasklogs/TaskLogPusher.java index a904a16f5d1..6329aac866d 100644 --- a/api/src/main/java/io/druid/tasklogs/TaskLogPusher.java +++ b/api/src/main/java/io/druid/tasklogs/TaskLogPusher.java @@ -31,4 +31,8 @@ import java.io.IOException; public interface TaskLogPusher { void pushTaskLog(String taskid, File logFile) throws IOException; + + default void pushTaskReports(String taskid, File reportFile) throws IOException + { + } } diff --git a/api/src/main/java/io/druid/tasklogs/TaskLogStreamer.java b/api/src/main/java/io/druid/tasklogs/TaskLogStreamer.java index b685c7b7659..7569cdd145b 100644 --- a/api/src/main/java/io/druid/tasklogs/TaskLogStreamer.java +++ b/api/src/main/java/io/druid/tasklogs/TaskLogStreamer.java @@ -40,4 +40,9 @@ public interface TaskLogStreamer * @return input supplier for this log, if available from this provider */ Optional streamTaskLog(String taskid, long offset) throws IOException; + + default Optional streamTaskReports(final String taskid) throws IOException + { + return Optional.absent(); + } } diff --git a/extensions-contrib/azure-extensions/src/main/java/io/druid/storage/azure/AzureTaskLogs.java b/extensions-contrib/azure-extensions/src/main/java/io/druid/storage/azure/AzureTaskLogs.java index 2cd17c1c41c..8fe5f3b3926 100644 --- a/extensions-contrib/azure-extensions/src/main/java/io/druid/storage/azure/AzureTaskLogs.java +++ b/extensions-contrib/azure-extensions/src/main/java/io/druid/storage/azure/AzureTaskLogs.java @@ -54,7 +54,19 @@ public class AzureTaskLogs implements TaskLogs { final String taskKey = getTaskLogKey(taskid); log.info("Pushing task log %s to: %s", logFile, taskKey); + pushTaskFile(taskid, logFile, taskKey); + } + @Override + public void pushTaskReports(String taskid, File reportFile) throws IOException + { + final String taskKey = getTaskReportsKey(taskid); + log.info("Pushing task reports %s to: %s", reportFile, taskKey); + pushTaskFile(taskid, reportFile, taskKey); + } + + private void pushTaskFile(final String taskId, final File logFile, String taskKey) + { try { AzureUtils.retryAzureOperation( () -> { @@ -71,9 +83,19 @@ public class AzureTaskLogs implements TaskLogs @Override public Optional streamTaskLog(final String taskid, final long offset) throws IOException + { + return streamTaskFile(taskid, offset, getTaskLogKey(taskid)); + } + + @Override + public Optional streamTaskReports(String taskid) throws IOException + { + return streamTaskFile(taskid, 0, getTaskReportsKey(taskid)); + } + + private Optional streamTaskFile(final String taskid, final long offset, String taskKey) throws IOException { final String container = config.getContainer(); - final String taskKey = getTaskLogKey(taskid); try { if (!azureStorage.getBlobExists(container, taskKey)) { @@ -116,12 +138,16 @@ public class AzureTaskLogs implements TaskLogs } } - private String getTaskLogKey(String taskid) { return StringUtils.format("%s/%s/log", config.getPrefix(), taskid); } + private String getTaskReportsKey(String taskid) + { + return StringUtils.format("%s/%s/report.json", config.getPrefix(), taskid); + } + @Override public void killAll() { diff --git a/extensions-contrib/google-extensions/src/main/java/io/druid/storage/google/GoogleTaskLogs.java b/extensions-contrib/google-extensions/src/main/java/io/druid/storage/google/GoogleTaskLogs.java index 6ed64576266..d379f3445ab 100644 --- a/extensions-contrib/google-extensions/src/main/java/io/druid/storage/google/GoogleTaskLogs.java +++ b/extensions-contrib/google-extensions/src/main/java/io/druid/storage/google/GoogleTaskLogs.java @@ -51,7 +51,19 @@ public class GoogleTaskLogs implements TaskLogs { final String taskKey = getTaskLogKey(taskid); LOG.info("Pushing task log %s to: %s", logFile, taskKey); + pushTaskFile(taskid, logFile, taskKey); + } + @Override + public void pushTaskReports(String taskid, File reportFile) throws IOException + { + final String taskKey = getTaskReportKey(taskid); + LOG.info("Pushing task reports %s to: %s", reportFile, taskKey); + pushTaskFile(taskid, reportFile, taskKey); + } + + private void pushTaskFile(final String taskid, final File logFile, final String taskKey) throws IOException + { FileInputStream fileSteam = new FileInputStream(logFile); InputStreamContent mediaContent = new InputStreamContent("text/plain", fileSteam); @@ -64,7 +76,18 @@ public class GoogleTaskLogs implements TaskLogs public Optional streamTaskLog(final String taskid, final long offset) throws IOException { final String taskKey = getTaskLogKey(taskid); + return streamTaskFile(taskid, offset, taskKey); + } + @Override + public Optional streamTaskReports(String taskid) throws IOException + { + final String taskKey = getTaskReportKey(taskid); + return streamTaskFile(taskid, 0, taskKey); + } + + private Optional streamTaskFile(final String taskid, final long offset, String taskKey) throws IOException + { try { if (!storage.exists(config.getBucket(), taskKey)) { return Optional.absent(); @@ -111,6 +134,11 @@ public class GoogleTaskLogs implements TaskLogs return config.getPrefix() + "/" + taskid.replaceAll(":", "_"); } + private String getTaskReportKey(String taskid) + { + return config.getPrefix() + "/" + taskid.replaceAll(":", "_") + ".report.json"; + } + @Override public void killAll() { diff --git a/extensions-core/hdfs-storage/src/main/java/io/druid/storage/hdfs/tasklog/HdfsTaskLogs.java b/extensions-core/hdfs-storage/src/main/java/io/druid/storage/hdfs/tasklog/HdfsTaskLogs.java index 5851c2fa48a..61da166187f 100644 --- a/extensions-core/hdfs-storage/src/main/java/io/druid/storage/hdfs/tasklog/HdfsTaskLogs.java +++ b/extensions-core/hdfs-storage/src/main/java/io/druid/storage/hdfs/tasklog/HdfsTaskLogs.java @@ -61,6 +61,21 @@ public class HdfsTaskLogs implements TaskLogs { final Path path = getTaskLogFileFromId(taskId); log.info("Writing task log to: %s", path); + pushTaskFile(path, logFile); + log.info("Wrote task log to: %s", path); + } + + @Override + public void pushTaskReports(String taskId, File reportFile) throws IOException + { + final Path path = getTaskReportsFileFromId(taskId); + log.info("Writing task reports to: %s", path); + pushTaskFile(path, reportFile); + log.info("Wrote task reports to: %s", path); + } + + private void pushTaskFile(Path path, File logFile) throws IOException + { final FileSystem fs = path.getFileSystem(hadoopConfig); try ( final InputStream in = new FileInputStream(logFile); @@ -68,14 +83,24 @@ public class HdfsTaskLogs implements TaskLogs ) { ByteStreams.copy(in, out); } - - log.info("Wrote task log to: %s", path); } @Override public Optional streamTaskLog(final String taskId, final long offset) throws IOException { final Path path = getTaskLogFileFromId(taskId); + return streamTaskFile(path, offset); + } + + @Override + public Optional streamTaskReports(String taskId) throws IOException + { + final Path path = getTaskReportsFileFromId(taskId); + return streamTaskFile(path, 0); + } + + private Optional streamTaskFile(final Path path, final long offset) throws IOException + { final FileSystem fs = path.getFileSystem(hadoopConfig); if (fs.exists(path)) { return Optional.of( @@ -113,6 +138,15 @@ public class HdfsTaskLogs implements TaskLogs return new Path(mergePaths(config.getDirectory(), taskId.replaceAll(":", "_"))); } + /** + * Due to https://issues.apache.org/jira/browse/HDFS-13 ":" are not allowed in + * path names. So we format paths differently for HDFS. + */ + private Path getTaskReportsFileFromId(String taskId) + { + return new Path(mergePaths(config.getDirectory(), taskId.replaceAll(":", "_") + ".reports.json")); + } + // some hadoop version Path.mergePaths does not exist private static String mergePaths(String path1, String path2) { diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java index a325948a650..82558f0c796 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java +++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java @@ -904,6 +904,7 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler toolbox.getDataSegmentServerAnnouncer().unannounce(); } + toolbox.getTaskReportFileWriter().write(null); return success(); } @@ -1272,6 +1273,7 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler toolbox.getDataSegmentServerAnnouncer().unannounce(); } + toolbox.getTaskReportFileWriter().write(null); return success(); } diff --git a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java index 8bdd3c7f087..9fb0495284f 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java @@ -57,6 +57,7 @@ import io.druid.indexing.common.actions.TaskActionClientFactory; import io.druid.indexing.common.actions.TaskActionToolbox; import io.druid.indexing.common.config.TaskConfig; import io.druid.indexing.common.config.TaskStorageConfig; +import io.druid.indexing.common.task.NoopTestTaskFileWriter; import io.druid.indexing.common.task.Task; import io.druid.indexing.kafka.supervisor.KafkaSupervisor; import io.druid.indexing.kafka.test.TestBroker; @@ -2032,7 +2033,8 @@ public class KafkaIndexTaskTest EasyMock.createNiceMock(DruidNodeAnnouncer.class), EasyMock.createNiceMock(DruidNode.class), new LookupNodeService("tier"), - new DataNodeService("tier", 1, ServerType.INDEXER_EXECUTOR, 0) + new DataNodeService("tier", 1, ServerType.INDEXER_EXECUTOR, 0), + new NoopTestTaskFileWriter() ); } diff --git a/extensions-core/s3-extensions/src/main/java/io/druid/storage/s3/S3TaskLogs.java b/extensions-core/s3-extensions/src/main/java/io/druid/storage/s3/S3TaskLogs.java index afef97a8892..e2f6d4310e4 100644 --- a/extensions-core/s3-extensions/src/main/java/io/druid/storage/s3/S3TaskLogs.java +++ b/extensions-core/s3-extensions/src/main/java/io/druid/storage/s3/S3TaskLogs.java @@ -57,8 +57,19 @@ public class S3TaskLogs implements TaskLogs @Override public Optional streamTaskLog(final String taskid, final long offset) throws IOException { - final String taskKey = getTaskLogKey(taskid); + final String taskKey = getTaskLogKey(taskid, "log"); + return streamTaskFile(offset, taskKey); + } + @Override + public Optional streamTaskReports(String taskid) throws IOException + { + final String taskKey = getTaskLogKey(taskid, "report.json"); + return streamTaskFile(0, taskKey); + } + + private Optional streamTaskFile(final long offset, String taskKey) throws IOException + { try { final ObjectMetadata objectMetadata = service.getObjectMetadata(config.getS3Bucket(), taskKey); @@ -107,9 +118,21 @@ public class S3TaskLogs implements TaskLogs @Override public void pushTaskLog(final String taskid, final File logFile) throws IOException { - final String taskKey = getTaskLogKey(taskid); + final String taskKey = getTaskLogKey(taskid, "log"); log.info("Pushing task log %s to: %s", logFile, taskKey); + pushTaskFile(logFile, taskKey); + } + @Override + public void pushTaskReports(String taskid, File reportFile) throws IOException + { + final String taskKey = getTaskLogKey(taskid, "report.json"); + log.info("Pushing task reports %s to: %s", reportFile, taskKey); + pushTaskFile(reportFile, taskKey); + } + + private void pushTaskFile(final File logFile, String taskKey) throws IOException + { try { S3Utils.retryS3Operation( () -> { @@ -124,9 +147,9 @@ public class S3TaskLogs implements TaskLogs } } - private String getTaskLogKey(String taskid) + private String getTaskLogKey(String taskid, String filename) { - return StringUtils.format("%s/%s/log", config.getS3Prefix(), taskid); + return StringUtils.format("%s/%s/%s", config.getS3Prefix(), taskid, filename); } @Override diff --git a/indexing-service/src/main/java/io/druid/indexing/common/TaskReport.java b/indexing-service/src/main/java/io/druid/indexing/common/TaskReport.java new file mode 100644 index 00000000000..eff6520741b --- /dev/null +++ b/indexing-service/src/main/java/io/druid/indexing/common/TaskReport.java @@ -0,0 +1,54 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.indexing.common; + +import com.fasterxml.jackson.annotation.JsonSubTypes; +import com.fasterxml.jackson.annotation.JsonTypeInfo; +import com.google.common.collect.Maps; + +import java.util.Map; + +/** + * TaskReport objects contain additional information about an indexing task, such as row statistics, errors, and + * published segments. They are kept in deep storage along with task logs. + */ +@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type") +@JsonSubTypes(value = { +}) +public interface TaskReport +{ + String getTaskId(); + + String getReportKey(); + + /** + * @return A JSON-serializable Object that contains a TaskReport's information + */ + Object getPayload(); + + static Map buildTaskReports(TaskReport... taskReports) + { + Map taskReportMap = Maps.newHashMap(); + for (TaskReport taskReport : taskReports) { + taskReportMap.put(taskReport.getReportKey(), taskReport); + } + return taskReportMap; + } +} diff --git a/indexing-service/src/main/java/io/druid/indexing/common/TaskReportFileWriter.java b/indexing-service/src/main/java/io/druid/indexing/common/TaskReportFileWriter.java new file mode 100644 index 00000000000..eb5e9d9db40 --- /dev/null +++ b/indexing-service/src/main/java/io/druid/indexing/common/TaskReportFileWriter.java @@ -0,0 +1,58 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.indexing.common; + +import com.fasterxml.jackson.databind.ObjectMapper; +import io.druid.java.util.common.logger.Logger; +import org.apache.commons.io.FileUtils; + +import java.io.File; + +public class TaskReportFileWriter +{ + private static final Logger log = new Logger(TaskReportFileWriter.class); + + private final File reportsFile; + private ObjectMapper objectMapper; + + public TaskReportFileWriter(File reportFile) + { + this.reportsFile = reportFile; + } + + public void write(TaskReport report) + { + try { + final File reportsFileParent = reportsFile.getParentFile(); + if (reportsFileParent != null) { + FileUtils.forceMkdir(reportsFileParent); + } + objectMapper.writeValue(reportsFile, report); + } + catch (Exception e) { + log.error(e, "Encountered exception in write()."); + } + } + + public void setObjectMapper(ObjectMapper objectMapper) + { + this.objectMapper = objectMapper; + } +} diff --git a/indexing-service/src/main/java/io/druid/indexing/common/TaskToolbox.java b/indexing-service/src/main/java/io/druid/indexing/common/TaskToolbox.java index dd132769192..9deab2bc497 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/TaskToolbox.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/TaskToolbox.java @@ -90,6 +90,7 @@ public class TaskToolbox private final Cache cache; private final CacheConfig cacheConfig; private final IndexMergerV9 indexMergerV9; + private final TaskReportFileWriter taskReportFileWriter; private final DruidNodeAnnouncer druidNodeAnnouncer; private final DruidNode druidNode; @@ -120,7 +121,8 @@ public class TaskToolbox DruidNodeAnnouncer druidNodeAnnouncer, DruidNode druidNode, LookupNodeService lookupNodeService, - DataNodeService dataNodeService + DataNodeService dataNodeService, + TaskReportFileWriter taskReportFileWriter ) { this.config = config; @@ -147,6 +149,8 @@ public class TaskToolbox this.druidNode = druidNode; this.lookupNodeService = lookupNodeService; this.dataNodeService = dataNodeService; + this.taskReportFileWriter = taskReportFileWriter; + this.taskReportFileWriter.setObjectMapper(this.objectMapper); } public TaskConfig getConfig() @@ -303,4 +307,9 @@ public class TaskToolbox { return druidNode; } + + public TaskReportFileWriter getTaskReportFileWriter() + { + return taskReportFileWriter; + } } diff --git a/indexing-service/src/main/java/io/druid/indexing/common/TaskToolboxFactory.java b/indexing-service/src/main/java/io/druid/indexing/common/TaskToolboxFactory.java index c17b23fe210..1a35ec040f8 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/TaskToolboxFactory.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/TaskToolboxFactory.java @@ -78,6 +78,7 @@ public class TaskToolboxFactory private final DruidNode druidNode; private final LookupNodeService lookupNodeService; private final DataNodeService dataNodeService; + private final TaskReportFileWriter taskReportFileWriter; @Inject public TaskToolboxFactory( @@ -103,7 +104,8 @@ public class TaskToolboxFactory DruidNodeAnnouncer druidNodeAnnouncer, @RemoteChatHandler DruidNode druidNode, LookupNodeService lookupNodeService, - DataNodeService dataNodeService + DataNodeService dataNodeService, + TaskReportFileWriter taskReportFileWriter ) { this.config = config; @@ -129,6 +131,7 @@ public class TaskToolboxFactory this.druidNode = druidNode; this.lookupNodeService = lookupNodeService; this.dataNodeService = dataNodeService; + this.taskReportFileWriter = taskReportFileWriter; } public TaskToolbox build(Task task) @@ -158,7 +161,8 @@ public class TaskToolboxFactory druidNodeAnnouncer, druidNode, lookupNodeService, - dataNodeService + dataNodeService, + taskReportFileWriter ); } } diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java index 3181b252544..cf408e14aaa 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java @@ -326,6 +326,7 @@ public class AppenderatorDriverRealtimeIndexTask extends AbstractTask } log.info("Job done!"); + toolbox.getTaskReportFileWriter().write(null); return TaskStatus.success(getId()); } diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopIndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopIndexTask.java index 4386b5b3595..f8e80e569a3 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopIndexTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopIndexTask.java @@ -229,6 +229,7 @@ public class HadoopIndexTask extends HadoopTask specVersion, version ); + toolbox.getTaskReportFileWriter().write(null); return TaskStatus.failure(getId()); } } @@ -253,8 +254,10 @@ public class HadoopIndexTask extends HadoopTask ); toolbox.publishSegments(publishedSegments); + toolbox.getTaskReportFileWriter().write(null); return TaskStatus.success(getId()); } else { + toolbox.getTaskReportFileWriter().write(null); return TaskStatus.failure(getId()); } } diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java index 7a4dff26c85..08f857ea421 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java @@ -262,8 +262,10 @@ public class IndexTask extends AbstractTask } if (generateAndPublishSegments(toolbox, dataSchema, shardSpecs, versions, firehoseFactory, firehoseTempDir)) { + toolbox.getTaskReportFileWriter().write(null); return TaskStatus.success(getId()); } else { + toolbox.getTaskReportFileWriter().write(null); return TaskStatus.failure(getId()); } } diff --git a/indexing-service/src/main/java/io/druid/indexing/common/tasklogs/FileTaskLogs.java b/indexing-service/src/main/java/io/druid/indexing/common/tasklogs/FileTaskLogs.java index 1c09b56cd8f..579234e6c07 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/tasklogs/FileTaskLogs.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/tasklogs/FileTaskLogs.java @@ -53,7 +53,7 @@ public class FileTaskLogs implements TaskLogs public void pushTaskLog(final String taskid, File file) throws IOException { if (config.getDirectory().exists() || config.getDirectory().mkdirs()) { - final File outputFile = fileForTask(taskid); + final File outputFile = fileForTask(taskid, file.getName()); Files.copy(file, outputFile); log.info("Wrote task log to: %s", outputFile); } else { @@ -61,10 +61,22 @@ public class FileTaskLogs implements TaskLogs } } + @Override + public void pushTaskReports(String taskid, File reportFile) throws IOException + { + if (config.getDirectory().exists() || config.getDirectory().mkdirs()) { + final File outputFile = fileForTask(taskid, reportFile.getName()); + Files.copy(reportFile, outputFile); + log.info("Wrote task report to: %s", outputFile); + } else { + throw new IOE("Unable to create task report dir[%s]", config.getDirectory()); + } + } + @Override public Optional streamTaskLog(final String taskid, final long offset) { - final File file = fileForTask(taskid); + final File file = fileForTask(taskid, "log"); if (file.exists()) { return Optional.of( new ByteSource() @@ -81,9 +93,29 @@ public class FileTaskLogs implements TaskLogs } } - private File fileForTask(final String taskid) + @Override + public Optional streamTaskReports(final String taskid) { - return new File(config.getDirectory(), StringUtils.format("%s.log", taskid)); + final File file = fileForTask(taskid, "report.json"); + if (file.exists()) { + return Optional.of( + new ByteSource() + { + @Override + public InputStream openStream() throws IOException + { + return LogUtils.streamFile(file, 0); + } + } + ); + } else { + return Optional.absent(); + } + } + + private File fileForTask(final String taskid, String filename) + { + return new File(config.getDirectory(), StringUtils.format("%s.%s", taskid, filename)); } @Override diff --git a/indexing-service/src/main/java/io/druid/indexing/common/tasklogs/SwitchingTaskLogStreamer.java b/indexing-service/src/main/java/io/druid/indexing/common/tasklogs/SwitchingTaskLogStreamer.java index cb8e0b34325..3b7e33baa76 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/tasklogs/SwitchingTaskLogStreamer.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/tasklogs/SwitchingTaskLogStreamer.java @@ -53,4 +53,17 @@ public class SwitchingTaskLogStreamer implements TaskLogStreamer return Optional.absent(); } + + @Override + public Optional streamTaskReports(String taskid) throws IOException + { + for (TaskLogStreamer provider : providers) { + final Optional stream = provider.streamTaskReports(taskid); + if (stream.isPresent()) { + return stream; + } + } + + return Optional.absent(); + } } diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/ForkingTaskRunner.java b/indexing-service/src/main/java/io/druid/indexing/overlord/ForkingTaskRunner.java index 519c172f35f..041dfd879f9 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/ForkingTaskRunner.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/ForkingTaskRunner.java @@ -260,6 +260,7 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogStreamer final File taskFile = new File(taskDir, "task.json"); final File statusFile = new File(attemptDir, "status.json"); final File logFile = new File(taskDir, "log"); + final File reportsFile = new File(attemptDir, "report.json"); // time to adjust process holders synchronized (tasks) { @@ -408,6 +409,7 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogStreamer command.add("peon"); command.add(taskFile.toString()); command.add(statusFile.toString()); + command.add(reportsFile.toString()); String nodeType = task.getNodeType(); if (nodeType != null) { command.add("--nodeType"); @@ -459,6 +461,9 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogStreamer Thread.currentThread().setName(priorThreadName); // Upload task logs taskLogPusher.pushTaskLog(task.getId(), logFile); + if (reportsFile.exists()) { + taskLogPusher.pushTaskReports(task.getId(), reportsFile); + } } TaskStatus status; diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/http/OverlordResource.java b/indexing-service/src/main/java/io/druid/indexing/overlord/http/OverlordResource.java index 8be1dcdbe95..1fa59cebd1a 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/http/OverlordResource.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/http/OverlordResource.java @@ -732,6 +732,33 @@ public class OverlordResource } } + @GET + @Path("/task/{taskid}/reports") + @Produces(MediaType.APPLICATION_JSON) + @ResourceFilters(TaskResourceFilter.class) + public Response doGetReports( + @PathParam("taskid") final String taskid + ) + { + try { + final Optional stream = taskLogStreamer.streamTaskReports(taskid); + if (stream.isPresent()) { + return Response.ok(stream.get().openStream()).build(); + } else { + return Response.status(Response.Status.NOT_FOUND) + .entity( + "No task reports were found for this task. " + + "The task may not exist, or it may not have completed yet." + ) + .build(); + } + } + catch (Exception e) { + log.warn(e, "Failed to stream task reports for task %s", taskid); + return Response.status(Response.Status.INTERNAL_SERVER_ERROR).build(); + } + } + @GET @Path("/dataSources/{dataSource}") @Produces(MediaType.APPLICATION_JSON) diff --git a/indexing-service/src/main/java/io/druid/indexing/worker/executor/ExecutorLifecycle.java b/indexing-service/src/main/java/io/druid/indexing/worker/executor/ExecutorLifecycle.java index d06830eaf6d..98e6382a43a 100644 --- a/indexing-service/src/main/java/io/druid/indexing/worker/executor/ExecutorLifecycle.java +++ b/indexing-service/src/main/java/io/druid/indexing/worker/executor/ExecutorLifecycle.java @@ -37,6 +37,7 @@ import io.druid.java.util.common.DateTimes; import io.druid.java.util.common.ISE; import io.druid.java.util.common.lifecycle.LifecycleStart; import io.druid.java.util.common.lifecycle.LifecycleStop; +import org.apache.commons.io.FileUtils; import java.io.File; import java.io.IOException; @@ -190,7 +191,7 @@ public class ExecutorLifecycle final File statusFileParent = statusFile.getParentFile(); if (statusFileParent != null) { - statusFileParent.mkdirs(); + FileUtils.forceMkdir(statusFileParent); } jsonMapper.writeValue(statusFile, taskStatus); diff --git a/indexing-service/src/test/java/io/druid/indexing/common/TaskToolboxTest.java b/indexing-service/src/test/java/io/druid/indexing/common/TaskToolboxTest.java index 52bfe1ea891..f73da9d2128 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/TaskToolboxTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/TaskToolboxTest.java @@ -25,6 +25,7 @@ import io.druid.client.cache.Cache; import io.druid.client.cache.CacheConfig; import io.druid.indexing.common.actions.TaskActionClientFactory; import io.druid.indexing.common.config.TaskConfig; +import io.druid.indexing.common.task.NoopTestTaskFileWriter; import io.druid.indexing.common.task.Task; import io.druid.java.util.common.Intervals; import io.druid.java.util.emitter.service.ServiceEmitter; @@ -114,7 +115,8 @@ public class TaskToolboxTest null, null, null, - null + null, + new NoopTestTaskFileWriter() ); } diff --git a/indexing-service/src/test/java/io/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java b/indexing-service/src/test/java/io/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java index 2761aacbdb6..abd451a9ed3 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java @@ -1243,7 +1243,8 @@ public class AppenderatorDriverRealtimeIndexTaskTest EasyMock.createNiceMock(DruidNodeAnnouncer.class), EasyMock.createNiceMock(DruidNode.class), new LookupNodeService("tier"), - new DataNodeService("tier", 1000, ServerType.INDEXER_EXECUTOR, 0) + new DataNodeService("tier", 1000, ServerType.INDEXER_EXECUTOR, 0), + new NoopTestTaskFileWriter() ); } diff --git a/indexing-service/src/test/java/io/druid/indexing/common/task/CompactionTaskTest.java b/indexing-service/src/test/java/io/druid/indexing/common/task/CompactionTaskTest.java index 440ed912f0e..27ecd4bb66a 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/task/CompactionTaskTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/task/CompactionTaskTest.java @@ -507,7 +507,8 @@ public class CompactionTaskTest null, null, null, - null + null, + new NoopTestTaskFileWriter() ); this.segmentFileMap = segmentFileMap; } diff --git a/indexing-service/src/test/java/io/druid/indexing/common/task/IndexTaskTest.java b/indexing-service/src/test/java/io/druid/indexing/common/task/IndexTaskTest.java index f449c3ab05d..8106c79122e 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/task/IndexTaskTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/task/IndexTaskTest.java @@ -1043,7 +1043,8 @@ public class IndexTaskTest null, null, null, - null + null, + new NoopTestTaskFileWriter() ); indexTask.isReady(box.getTaskActionClient()); diff --git a/indexing-service/src/test/java/io/druid/indexing/common/task/NoopTestTaskFileWriter.java b/indexing-service/src/test/java/io/druid/indexing/common/task/NoopTestTaskFileWriter.java new file mode 100644 index 00000000000..cebee6c624f --- /dev/null +++ b/indexing-service/src/test/java/io/druid/indexing/common/task/NoopTestTaskFileWriter.java @@ -0,0 +1,36 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.indexing.common.task; + +import io.druid.indexing.common.TaskReport; +import io.druid.indexing.common.TaskReportFileWriter; + +public class NoopTestTaskFileWriter extends TaskReportFileWriter +{ + public NoopTestTaskFileWriter() + { + super(null); + } + + @Override + public void write(TaskReport report) + { + } +} diff --git a/indexing-service/src/test/java/io/druid/indexing/common/task/RealtimeIndexTaskTest.java b/indexing-service/src/test/java/io/druid/indexing/common/task/RealtimeIndexTaskTest.java index 518d1d63b5d..916c925e0d0 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/task/RealtimeIndexTaskTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/task/RealtimeIndexTaskTest.java @@ -1084,7 +1084,8 @@ public class RealtimeIndexTaskTest EasyMock.createNiceMock(DruidNodeAnnouncer.class), EasyMock.createNiceMock(DruidNode.class), new LookupNodeService("tier"), - new DataNodeService("tier", 1000, ServerType.INDEXER_EXECUTOR, 0) + new DataNodeService("tier", 1000, ServerType.INDEXER_EXECUTOR, 0), + new NoopTestTaskFileWriter() ); return toolboxFactory.build(task); diff --git a/indexing-service/src/test/java/io/druid/indexing/common/task/SameIntervalMergeTaskTest.java b/indexing-service/src/test/java/io/druid/indexing/common/task/SameIntervalMergeTaskTest.java index c1cd914a8a3..e3c232db23c 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/task/SameIntervalMergeTaskTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/task/SameIntervalMergeTaskTest.java @@ -256,7 +256,8 @@ public class SameIntervalMergeTaskTest null, null, null, - null + null, + new NoopTestTaskFileWriter() ) ); diff --git a/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java b/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java index e311b514da8..d85a1e44bd2 100644 --- a/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java @@ -50,6 +50,7 @@ import io.druid.indexing.common.actions.TaskActionToolbox; import io.druid.indexing.common.config.TaskConfig; import io.druid.indexing.common.config.TaskStorageConfig; import io.druid.indexing.common.task.NoopTask; +import io.druid.indexing.common.task.NoopTestTaskFileWriter; import io.druid.indexing.common.task.Task; import io.druid.indexing.overlord.HeapMemoryTaskStorage; import io.druid.indexing.overlord.TaskLockbox; @@ -312,7 +313,8 @@ public class IngestSegmentFirehoseFactoryTest null, null, null, - null + null, + new NoopTestTaskFileWriter() ); Collection values = new LinkedList<>(); for (InputRowParser parser : Arrays.asList( diff --git a/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTimelineTest.java b/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTimelineTest.java index ff1a738260b..14a52cbfcad 100644 --- a/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTimelineTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTimelineTest.java @@ -47,6 +47,7 @@ import io.druid.indexing.common.actions.TaskActionClient; import io.druid.indexing.common.actions.TaskActionClientFactory; import io.druid.indexing.common.config.TaskConfig; import io.druid.indexing.common.task.NoopTask; +import io.druid.indexing.common.task.NoopTestTaskFileWriter; import io.druid.indexing.common.task.Task; import io.druid.java.util.common.DateTimes; import io.druid.java.util.common.Intervals; @@ -343,7 +344,8 @@ public class IngestSegmentFirehoseFactoryTimelineTest null, null, null, - null + null, + new NoopTestTaskFileWriter() ); final IngestSegmentFirehoseFactory factory = new IngestSegmentFirehoseFactory( DATA_SOURCE, diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java index 5fb348fc363..c17452f654a 100644 --- a/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java @@ -63,6 +63,7 @@ import io.druid.indexing.common.task.IndexTask.IndexIOConfig; import io.druid.indexing.common.task.IndexTask.IndexIngestionSpec; import io.druid.indexing.common.task.IndexTask.IndexTuningConfig; import io.druid.indexing.common.task.KillTask; +import io.druid.indexing.common.task.NoopTestTaskFileWriter; import io.druid.indexing.common.task.RealtimeIndexTask; import io.druid.indexing.common.task.Task; import io.druid.indexing.common.task.TaskResource; @@ -605,7 +606,8 @@ public class TaskLifecycleTest EasyMock.createNiceMock(DruidNodeAnnouncer.class), EasyMock.createNiceMock(DruidNode.class), new LookupNodeService("tier"), - new DataNodeService("tier", 1000, ServerType.INDEXER_EXECUTOR, 0) + new DataNodeService("tier", 1000, ServerType.INDEXER_EXECUTOR, 0), + new NoopTestTaskFileWriter() ); } diff --git a/indexing-service/src/test/java/io/druid/indexing/worker/WorkerTaskManagerTest.java b/indexing-service/src/test/java/io/druid/indexing/worker/WorkerTaskManagerTest.java index 3b3ca7393fb..9be677ae07b 100644 --- a/indexing-service/src/test/java/io/druid/indexing/worker/WorkerTaskManagerTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/worker/WorkerTaskManagerTest.java @@ -35,6 +35,7 @@ import io.druid.indexing.common.actions.TaskActionClient; import io.druid.indexing.common.actions.TaskActionClientFactory; import io.druid.indexing.common.config.TaskConfig; import io.druid.indexing.common.task.NoopTask; +import io.druid.indexing.common.task.NoopTestTaskFileWriter; import io.druid.indexing.common.task.Task; import io.druid.indexing.common.task.Tasks; import io.druid.indexing.overlord.ThreadPoolTaskRunner; @@ -125,7 +126,8 @@ public class WorkerTaskManagerTest null, null, null, - null + null, + new NoopTestTaskFileWriter() ), taskConfig, new NoopServiceEmitter(), diff --git a/indexing-service/src/test/java/io/druid/indexing/worker/WorkerTaskMonitorTest.java b/indexing-service/src/test/java/io/druid/indexing/worker/WorkerTaskMonitorTest.java index 1b74f037738..51d27f0b2a4 100644 --- a/indexing-service/src/test/java/io/druid/indexing/worker/WorkerTaskMonitorTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/worker/WorkerTaskMonitorTest.java @@ -36,6 +36,7 @@ import io.druid.indexing.common.TestUtils; import io.druid.indexing.common.actions.TaskActionClient; import io.druid.indexing.common.actions.TaskActionClientFactory; import io.druid.indexing.common.config.TaskConfig; +import io.druid.indexing.common.task.NoopTestTaskFileWriter; import io.druid.indexing.common.task.Task; import io.druid.indexing.overlord.TestRemoteTaskRunnerConfig; import io.druid.indexing.overlord.ThreadPoolTaskRunner; @@ -190,7 +191,8 @@ public class WorkerTaskMonitorTest null, null, null, - null + null, + new NoopTestTaskFileWriter() ), taskConfig, new NoopServiceEmitter(), diff --git a/services/src/main/java/io/druid/cli/CliPeon.java b/services/src/main/java/io/druid/cli/CliPeon.java index a205dfab01b..d7da80bae62 100644 --- a/services/src/main/java/io/druid/cli/CliPeon.java +++ b/services/src/main/java/io/druid/cli/CliPeon.java @@ -54,6 +54,7 @@ import io.druid.guice.annotations.Json; import io.druid.guice.annotations.Smile; import io.druid.indexing.common.RetryPolicyConfig; import io.druid.indexing.common.RetryPolicyFactory; +import io.druid.indexing.common.TaskReportFileWriter; import io.druid.indexing.common.TaskToolboxFactory; import io.druid.indexing.common.actions.LocalTaskActionClientFactory; import io.druid.indexing.common.actions.RemoteTaskActionClientFactory; @@ -113,9 +114,18 @@ import java.util.Set; ) public class CliPeon extends GuiceRunnable { - @Arguments(description = "task.json status.json", required = true) + @Arguments(description = "task.json status.json report.json", required = true) public List taskAndStatusFile; + // path to store the task's stdout log + private String taskLogPath; + + // path to store the task's TaskStatus + private String taskStatusPath; + + // path to store the task's TaskReport objects + private String taskReportPath; + @Option(name = "--nodeType", title = "nodeType", description = "Set the node type to expose on ZK") public String nodeType = "indexer-executor"; @@ -141,6 +151,10 @@ public class CliPeon extends GuiceRunnable @Override public void configure(Binder binder) { + taskLogPath = taskAndStatusFile.get(0); + taskStatusPath = taskAndStatusFile.get(1); + taskReportPath = taskAndStatusFile.get(2); + binder.bindConstant().annotatedWith(Names.named("serviceName")).to("druid/peon"); binder.bindConstant().annotatedWith(Names.named("servicePort")).to(0); binder.bindConstant().annotatedWith(Names.named("tlsServicePort")).to(-1); @@ -183,8 +197,14 @@ public class CliPeon extends GuiceRunnable LifecycleModule.register(binder, ExecutorLifecycle.class); binder.bind(ExecutorLifecycleConfig.class).toInstance( new ExecutorLifecycleConfig() - .setTaskFile(new File(taskAndStatusFile.get(0))) - .setStatusFile(new File(taskAndStatusFile.get(1))) + .setTaskFile(new File(taskLogPath)) + .setStatusFile(new File(taskStatusPath)) + ); + + binder.bind(TaskReportFileWriter.class).toInstance( + new TaskReportFileWriter( + new File(taskReportPath) + ) ); binder.bind(TaskRunner.class).to(ThreadPoolTaskRunner.class);