Add support for task reports, upload reports to deep storage (#5524)

* Add support for task reports, upload reports to deep storage

* PR comments

* Better name for method

* Fix report file upload

* Use TaskReportFileWriter

* Checkstyle

* More PR comments
This commit is contained in:
Jonathan Wei 2018-04-02 12:10:56 -07:00 committed by GitHub
parent 6feac204e3
commit 723f7ac550
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
34 changed files with 444 additions and 31 deletions

View File

@ -24,6 +24,7 @@ import com.google.common.io.ByteSource;
import io.druid.java.util.common.logger.Logger;
import java.io.File;
import java.io.IOException;
public class NoopTaskLogs implements TaskLogs
{
@ -41,6 +42,12 @@ public class NoopTaskLogs implements TaskLogs
log.info("Not pushing logs for task: %s", taskid);
}
@Override
public void pushTaskReports(String taskid, File reportFile) throws IOException
{
log.info("Not pushing reports for task: %s", taskid);
}
@Override
public void killAll()
{

View File

@ -31,4 +31,8 @@ import java.io.IOException;
public interface TaskLogPusher
{
void pushTaskLog(String taskid, File logFile) throws IOException;
default void pushTaskReports(String taskid, File reportFile) throws IOException
{
}
}

View File

@ -40,4 +40,9 @@ public interface TaskLogStreamer
* @return input supplier for this log, if available from this provider
*/
Optional<ByteSource> streamTaskLog(String taskid, long offset) throws IOException;
default Optional<ByteSource> streamTaskReports(final String taskid) throws IOException
{
return Optional.absent();
}
}

View File

@ -54,7 +54,19 @@ public class AzureTaskLogs implements TaskLogs
{
final String taskKey = getTaskLogKey(taskid);
log.info("Pushing task log %s to: %s", logFile, taskKey);
pushTaskFile(taskid, logFile, taskKey);
}
@Override
public void pushTaskReports(String taskid, File reportFile) throws IOException
{
final String taskKey = getTaskReportsKey(taskid);
log.info("Pushing task reports %s to: %s", reportFile, taskKey);
pushTaskFile(taskid, reportFile, taskKey);
}
private void pushTaskFile(final String taskId, final File logFile, String taskKey)
{
try {
AzureUtils.retryAzureOperation(
() -> {
@ -71,9 +83,19 @@ public class AzureTaskLogs implements TaskLogs
@Override
public Optional<ByteSource> streamTaskLog(final String taskid, final long offset) throws IOException
{
return streamTaskFile(taskid, offset, getTaskLogKey(taskid));
}
@Override
public Optional<ByteSource> streamTaskReports(String taskid) throws IOException
{
return streamTaskFile(taskid, 0, getTaskReportsKey(taskid));
}
private Optional<ByteSource> streamTaskFile(final String taskid, final long offset, String taskKey) throws IOException
{
final String container = config.getContainer();
final String taskKey = getTaskLogKey(taskid);
try {
if (!azureStorage.getBlobExists(container, taskKey)) {
@ -116,12 +138,16 @@ public class AzureTaskLogs implements TaskLogs
}
}
private String getTaskLogKey(String taskid)
{
return StringUtils.format("%s/%s/log", config.getPrefix(), taskid);
}
private String getTaskReportsKey(String taskid)
{
return StringUtils.format("%s/%s/report.json", config.getPrefix(), taskid);
}
@Override
public void killAll()
{

View File

@ -51,7 +51,19 @@ public class GoogleTaskLogs implements TaskLogs
{
final String taskKey = getTaskLogKey(taskid);
LOG.info("Pushing task log %s to: %s", logFile, taskKey);
pushTaskFile(taskid, logFile, taskKey);
}
@Override
public void pushTaskReports(String taskid, File reportFile) throws IOException
{
final String taskKey = getTaskReportKey(taskid);
LOG.info("Pushing task reports %s to: %s", reportFile, taskKey);
pushTaskFile(taskid, reportFile, taskKey);
}
private void pushTaskFile(final String taskid, final File logFile, final String taskKey) throws IOException
{
FileInputStream fileSteam = new FileInputStream(logFile);
InputStreamContent mediaContent = new InputStreamContent("text/plain", fileSteam);
@ -64,7 +76,18 @@ public class GoogleTaskLogs implements TaskLogs
public Optional<ByteSource> streamTaskLog(final String taskid, final long offset) throws IOException
{
final String taskKey = getTaskLogKey(taskid);
return streamTaskFile(taskid, offset, taskKey);
}
@Override
public Optional<ByteSource> streamTaskReports(String taskid) throws IOException
{
final String taskKey = getTaskReportKey(taskid);
return streamTaskFile(taskid, 0, taskKey);
}
private Optional<ByteSource> streamTaskFile(final String taskid, final long offset, String taskKey) throws IOException
{
try {
if (!storage.exists(config.getBucket(), taskKey)) {
return Optional.absent();
@ -111,6 +134,11 @@ public class GoogleTaskLogs implements TaskLogs
return config.getPrefix() + "/" + taskid.replaceAll(":", "_");
}
private String getTaskReportKey(String taskid)
{
return config.getPrefix() + "/" + taskid.replaceAll(":", "_") + ".report.json";
}
@Override
public void killAll()
{

View File

@ -61,6 +61,21 @@ public class HdfsTaskLogs implements TaskLogs
{
final Path path = getTaskLogFileFromId(taskId);
log.info("Writing task log to: %s", path);
pushTaskFile(path, logFile);
log.info("Wrote task log to: %s", path);
}
@Override
public void pushTaskReports(String taskId, File reportFile) throws IOException
{
final Path path = getTaskReportsFileFromId(taskId);
log.info("Writing task reports to: %s", path);
pushTaskFile(path, reportFile);
log.info("Wrote task reports to: %s", path);
}
private void pushTaskFile(Path path, File logFile) throws IOException
{
final FileSystem fs = path.getFileSystem(hadoopConfig);
try (
final InputStream in = new FileInputStream(logFile);
@ -68,14 +83,24 @@ public class HdfsTaskLogs implements TaskLogs
) {
ByteStreams.copy(in, out);
}
log.info("Wrote task log to: %s", path);
}
@Override
public Optional<ByteSource> streamTaskLog(final String taskId, final long offset) throws IOException
{
final Path path = getTaskLogFileFromId(taskId);
return streamTaskFile(path, offset);
}
@Override
public Optional<ByteSource> streamTaskReports(String taskId) throws IOException
{
final Path path = getTaskReportsFileFromId(taskId);
return streamTaskFile(path, 0);
}
private Optional<ByteSource> streamTaskFile(final Path path, final long offset) throws IOException
{
final FileSystem fs = path.getFileSystem(hadoopConfig);
if (fs.exists(path)) {
return Optional.<ByteSource>of(
@ -113,6 +138,15 @@ public class HdfsTaskLogs implements TaskLogs
return new Path(mergePaths(config.getDirectory(), taskId.replaceAll(":", "_")));
}
/**
* Due to https://issues.apache.org/jira/browse/HDFS-13 ":" are not allowed in
* path names. So we format paths differently for HDFS.
*/
private Path getTaskReportsFileFromId(String taskId)
{
return new Path(mergePaths(config.getDirectory(), taskId.replaceAll(":", "_") + ".reports.json"));
}
// some hadoop version Path.mergePaths does not exist
private static String mergePaths(String path1, String path2)
{

View File

@ -904,6 +904,7 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler
toolbox.getDataSegmentServerAnnouncer().unannounce();
}
toolbox.getTaskReportFileWriter().write(null);
return success();
}
@ -1272,6 +1273,7 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler
toolbox.getDataSegmentServerAnnouncer().unannounce();
}
toolbox.getTaskReportFileWriter().write(null);
return success();
}

View File

@ -57,6 +57,7 @@ import io.druid.indexing.common.actions.TaskActionClientFactory;
import io.druid.indexing.common.actions.TaskActionToolbox;
import io.druid.indexing.common.config.TaskConfig;
import io.druid.indexing.common.config.TaskStorageConfig;
import io.druid.indexing.common.task.NoopTestTaskFileWriter;
import io.druid.indexing.common.task.Task;
import io.druid.indexing.kafka.supervisor.KafkaSupervisor;
import io.druid.indexing.kafka.test.TestBroker;
@ -2032,7 +2033,8 @@ public class KafkaIndexTaskTest
EasyMock.createNiceMock(DruidNodeAnnouncer.class),
EasyMock.createNiceMock(DruidNode.class),
new LookupNodeService("tier"),
new DataNodeService("tier", 1, ServerType.INDEXER_EXECUTOR, 0)
new DataNodeService("tier", 1, ServerType.INDEXER_EXECUTOR, 0),
new NoopTestTaskFileWriter()
);
}

View File

@ -57,8 +57,19 @@ public class S3TaskLogs implements TaskLogs
@Override
public Optional<ByteSource> streamTaskLog(final String taskid, final long offset) throws IOException
{
final String taskKey = getTaskLogKey(taskid);
final String taskKey = getTaskLogKey(taskid, "log");
return streamTaskFile(offset, taskKey);
}
@Override
public Optional<ByteSource> streamTaskReports(String taskid) throws IOException
{
final String taskKey = getTaskLogKey(taskid, "report.json");
return streamTaskFile(0, taskKey);
}
private Optional<ByteSource> streamTaskFile(final long offset, String taskKey) throws IOException
{
try {
final ObjectMetadata objectMetadata = service.getObjectMetadata(config.getS3Bucket(), taskKey);
@ -107,9 +118,21 @@ public class S3TaskLogs implements TaskLogs
@Override
public void pushTaskLog(final String taskid, final File logFile) throws IOException
{
final String taskKey = getTaskLogKey(taskid);
final String taskKey = getTaskLogKey(taskid, "log");
log.info("Pushing task log %s to: %s", logFile, taskKey);
pushTaskFile(logFile, taskKey);
}
@Override
public void pushTaskReports(String taskid, File reportFile) throws IOException
{
final String taskKey = getTaskLogKey(taskid, "report.json");
log.info("Pushing task reports %s to: %s", reportFile, taskKey);
pushTaskFile(reportFile, taskKey);
}
private void pushTaskFile(final File logFile, String taskKey) throws IOException
{
try {
S3Utils.retryS3Operation(
() -> {
@ -124,9 +147,9 @@ public class S3TaskLogs implements TaskLogs
}
}
private String getTaskLogKey(String taskid)
private String getTaskLogKey(String taskid, String filename)
{
return StringUtils.format("%s/%s/log", config.getS3Prefix(), taskid);
return StringUtils.format("%s/%s/%s", config.getS3Prefix(), taskid, filename);
}
@Override

View File

@ -0,0 +1,54 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.indexing.common;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.google.common.collect.Maps;
import java.util.Map;
/**
* TaskReport objects contain additional information about an indexing task, such as row statistics, errors, and
* published segments. They are kept in deep storage along with task logs.
*/
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
@JsonSubTypes(value = {
})
public interface TaskReport
{
String getTaskId();
String getReportKey();
/**
* @return A JSON-serializable Object that contains a TaskReport's information
*/
Object getPayload();
static Map<String, TaskReport> buildTaskReports(TaskReport... taskReports)
{
Map<String, TaskReport> taskReportMap = Maps.newHashMap();
for (TaskReport taskReport : taskReports) {
taskReportMap.put(taskReport.getReportKey(), taskReport);
}
return taskReportMap;
}
}

View File

@ -0,0 +1,58 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.indexing.common;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.druid.java.util.common.logger.Logger;
import org.apache.commons.io.FileUtils;
import java.io.File;
public class TaskReportFileWriter
{
private static final Logger log = new Logger(TaskReportFileWriter.class);
private final File reportsFile;
private ObjectMapper objectMapper;
public TaskReportFileWriter(File reportFile)
{
this.reportsFile = reportFile;
}
public void write(TaskReport report)
{
try {
final File reportsFileParent = reportsFile.getParentFile();
if (reportsFileParent != null) {
FileUtils.forceMkdir(reportsFileParent);
}
objectMapper.writeValue(reportsFile, report);
}
catch (Exception e) {
log.error(e, "Encountered exception in write().");
}
}
public void setObjectMapper(ObjectMapper objectMapper)
{
this.objectMapper = objectMapper;
}
}

View File

@ -90,6 +90,7 @@ public class TaskToolbox
private final Cache cache;
private final CacheConfig cacheConfig;
private final IndexMergerV9 indexMergerV9;
private final TaskReportFileWriter taskReportFileWriter;
private final DruidNodeAnnouncer druidNodeAnnouncer;
private final DruidNode druidNode;
@ -120,7 +121,8 @@ public class TaskToolbox
DruidNodeAnnouncer druidNodeAnnouncer,
DruidNode druidNode,
LookupNodeService lookupNodeService,
DataNodeService dataNodeService
DataNodeService dataNodeService,
TaskReportFileWriter taskReportFileWriter
)
{
this.config = config;
@ -147,6 +149,8 @@ public class TaskToolbox
this.druidNode = druidNode;
this.lookupNodeService = lookupNodeService;
this.dataNodeService = dataNodeService;
this.taskReportFileWriter = taskReportFileWriter;
this.taskReportFileWriter.setObjectMapper(this.objectMapper);
}
public TaskConfig getConfig()
@ -303,4 +307,9 @@ public class TaskToolbox
{
return druidNode;
}
public TaskReportFileWriter getTaskReportFileWriter()
{
return taskReportFileWriter;
}
}

View File

@ -78,6 +78,7 @@ public class TaskToolboxFactory
private final DruidNode druidNode;
private final LookupNodeService lookupNodeService;
private final DataNodeService dataNodeService;
private final TaskReportFileWriter taskReportFileWriter;
@Inject
public TaskToolboxFactory(
@ -103,7 +104,8 @@ public class TaskToolboxFactory
DruidNodeAnnouncer druidNodeAnnouncer,
@RemoteChatHandler DruidNode druidNode,
LookupNodeService lookupNodeService,
DataNodeService dataNodeService
DataNodeService dataNodeService,
TaskReportFileWriter taskReportFileWriter
)
{
this.config = config;
@ -129,6 +131,7 @@ public class TaskToolboxFactory
this.druidNode = druidNode;
this.lookupNodeService = lookupNodeService;
this.dataNodeService = dataNodeService;
this.taskReportFileWriter = taskReportFileWriter;
}
public TaskToolbox build(Task task)
@ -158,7 +161,8 @@ public class TaskToolboxFactory
druidNodeAnnouncer,
druidNode,
lookupNodeService,
dataNodeService
dataNodeService,
taskReportFileWriter
);
}
}

View File

@ -326,6 +326,7 @@ public class AppenderatorDriverRealtimeIndexTask extends AbstractTask
}
log.info("Job done!");
toolbox.getTaskReportFileWriter().write(null);
return TaskStatus.success(getId());
}

View File

@ -229,6 +229,7 @@ public class HadoopIndexTask extends HadoopTask
specVersion,
version
);
toolbox.getTaskReportFileWriter().write(null);
return TaskStatus.failure(getId());
}
}
@ -253,8 +254,10 @@ public class HadoopIndexTask extends HadoopTask
);
toolbox.publishSegments(publishedSegments);
toolbox.getTaskReportFileWriter().write(null);
return TaskStatus.success(getId());
} else {
toolbox.getTaskReportFileWriter().write(null);
return TaskStatus.failure(getId());
}
}

View File

@ -262,8 +262,10 @@ public class IndexTask extends AbstractTask
}
if (generateAndPublishSegments(toolbox, dataSchema, shardSpecs, versions, firehoseFactory, firehoseTempDir)) {
toolbox.getTaskReportFileWriter().write(null);
return TaskStatus.success(getId());
} else {
toolbox.getTaskReportFileWriter().write(null);
return TaskStatus.failure(getId());
}
}

View File

@ -53,7 +53,7 @@ public class FileTaskLogs implements TaskLogs
public void pushTaskLog(final String taskid, File file) throws IOException
{
if (config.getDirectory().exists() || config.getDirectory().mkdirs()) {
final File outputFile = fileForTask(taskid);
final File outputFile = fileForTask(taskid, file.getName());
Files.copy(file, outputFile);
log.info("Wrote task log to: %s", outputFile);
} else {
@ -61,10 +61,22 @@ public class FileTaskLogs implements TaskLogs
}
}
@Override
public void pushTaskReports(String taskid, File reportFile) throws IOException
{
if (config.getDirectory().exists() || config.getDirectory().mkdirs()) {
final File outputFile = fileForTask(taskid, reportFile.getName());
Files.copy(reportFile, outputFile);
log.info("Wrote task report to: %s", outputFile);
} else {
throw new IOE("Unable to create task report dir[%s]", config.getDirectory());
}
}
@Override
public Optional<ByteSource> streamTaskLog(final String taskid, final long offset)
{
final File file = fileForTask(taskid);
final File file = fileForTask(taskid, "log");
if (file.exists()) {
return Optional.<ByteSource>of(
new ByteSource()
@ -81,9 +93,29 @@ public class FileTaskLogs implements TaskLogs
}
}
private File fileForTask(final String taskid)
@Override
public Optional<ByteSource> streamTaskReports(final String taskid)
{
return new File(config.getDirectory(), StringUtils.format("%s.log", taskid));
final File file = fileForTask(taskid, "report.json");
if (file.exists()) {
return Optional.<ByteSource>of(
new ByteSource()
{
@Override
public InputStream openStream() throws IOException
{
return LogUtils.streamFile(file, 0);
}
}
);
} else {
return Optional.absent();
}
}
private File fileForTask(final String taskid, String filename)
{
return new File(config.getDirectory(), StringUtils.format("%s.%s", taskid, filename));
}
@Override

View File

@ -53,4 +53,17 @@ public class SwitchingTaskLogStreamer implements TaskLogStreamer
return Optional.absent();
}
@Override
public Optional<ByteSource> streamTaskReports(String taskid) throws IOException
{
for (TaskLogStreamer provider : providers) {
final Optional<ByteSource> stream = provider.streamTaskReports(taskid);
if (stream.isPresent()) {
return stream;
}
}
return Optional.absent();
}
}

View File

@ -260,6 +260,7 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogStreamer
final File taskFile = new File(taskDir, "task.json");
final File statusFile = new File(attemptDir, "status.json");
final File logFile = new File(taskDir, "log");
final File reportsFile = new File(attemptDir, "report.json");
// time to adjust process holders
synchronized (tasks) {
@ -408,6 +409,7 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogStreamer
command.add("peon");
command.add(taskFile.toString());
command.add(statusFile.toString());
command.add(reportsFile.toString());
String nodeType = task.getNodeType();
if (nodeType != null) {
command.add("--nodeType");
@ -459,6 +461,9 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogStreamer
Thread.currentThread().setName(priorThreadName);
// Upload task logs
taskLogPusher.pushTaskLog(task.getId(), logFile);
if (reportsFile.exists()) {
taskLogPusher.pushTaskReports(task.getId(), reportsFile);
}
}
TaskStatus status;

View File

@ -732,6 +732,33 @@ public class OverlordResource
}
}
@GET
@Path("/task/{taskid}/reports")
@Produces(MediaType.APPLICATION_JSON)
@ResourceFilters(TaskResourceFilter.class)
public Response doGetReports(
@PathParam("taskid") final String taskid
)
{
try {
final Optional<ByteSource> stream = taskLogStreamer.streamTaskReports(taskid);
if (stream.isPresent()) {
return Response.ok(stream.get().openStream()).build();
} else {
return Response.status(Response.Status.NOT_FOUND)
.entity(
"No task reports were found for this task. "
+ "The task may not exist, or it may not have completed yet."
)
.build();
}
}
catch (Exception e) {
log.warn(e, "Failed to stream task reports for task %s", taskid);
return Response.status(Response.Status.INTERNAL_SERVER_ERROR).build();
}
}
@GET
@Path("/dataSources/{dataSource}")
@Produces(MediaType.APPLICATION_JSON)

View File

@ -37,6 +37,7 @@ import io.druid.java.util.common.DateTimes;
import io.druid.java.util.common.ISE;
import io.druid.java.util.common.lifecycle.LifecycleStart;
import io.druid.java.util.common.lifecycle.LifecycleStop;
import org.apache.commons.io.FileUtils;
import java.io.File;
import java.io.IOException;
@ -190,7 +191,7 @@ public class ExecutorLifecycle
final File statusFileParent = statusFile.getParentFile();
if (statusFileParent != null) {
statusFileParent.mkdirs();
FileUtils.forceMkdir(statusFileParent);
}
jsonMapper.writeValue(statusFile, taskStatus);

View File

@ -25,6 +25,7 @@ import io.druid.client.cache.Cache;
import io.druid.client.cache.CacheConfig;
import io.druid.indexing.common.actions.TaskActionClientFactory;
import io.druid.indexing.common.config.TaskConfig;
import io.druid.indexing.common.task.NoopTestTaskFileWriter;
import io.druid.indexing.common.task.Task;
import io.druid.java.util.common.Intervals;
import io.druid.java.util.emitter.service.ServiceEmitter;
@ -114,7 +115,8 @@ public class TaskToolboxTest
null,
null,
null,
null
null,
new NoopTestTaskFileWriter()
);
}

View File

@ -1243,7 +1243,8 @@ public class AppenderatorDriverRealtimeIndexTaskTest
EasyMock.createNiceMock(DruidNodeAnnouncer.class),
EasyMock.createNiceMock(DruidNode.class),
new LookupNodeService("tier"),
new DataNodeService("tier", 1000, ServerType.INDEXER_EXECUTOR, 0)
new DataNodeService("tier", 1000, ServerType.INDEXER_EXECUTOR, 0),
new NoopTestTaskFileWriter()
);
}

View File

@ -507,7 +507,8 @@ public class CompactionTaskTest
null,
null,
null,
null
null,
new NoopTestTaskFileWriter()
);
this.segmentFileMap = segmentFileMap;
}

View File

@ -1043,7 +1043,8 @@ public class IndexTaskTest
null,
null,
null,
null
null,
new NoopTestTaskFileWriter()
);
indexTask.isReady(box.getTaskActionClient());

View File

@ -0,0 +1,36 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.indexing.common.task;
import io.druid.indexing.common.TaskReport;
import io.druid.indexing.common.TaskReportFileWriter;
public class NoopTestTaskFileWriter extends TaskReportFileWriter
{
public NoopTestTaskFileWriter()
{
super(null);
}
@Override
public void write(TaskReport report)
{
}
}

View File

@ -1084,7 +1084,8 @@ public class RealtimeIndexTaskTest
EasyMock.createNiceMock(DruidNodeAnnouncer.class),
EasyMock.createNiceMock(DruidNode.class),
new LookupNodeService("tier"),
new DataNodeService("tier", 1000, ServerType.INDEXER_EXECUTOR, 0)
new DataNodeService("tier", 1000, ServerType.INDEXER_EXECUTOR, 0),
new NoopTestTaskFileWriter()
);
return toolboxFactory.build(task);

View File

@ -256,7 +256,8 @@ public class SameIntervalMergeTaskTest
null,
null,
null,
null
null,
new NoopTestTaskFileWriter()
)
);

View File

@ -50,6 +50,7 @@ import io.druid.indexing.common.actions.TaskActionToolbox;
import io.druid.indexing.common.config.TaskConfig;
import io.druid.indexing.common.config.TaskStorageConfig;
import io.druid.indexing.common.task.NoopTask;
import io.druid.indexing.common.task.NoopTestTaskFileWriter;
import io.druid.indexing.common.task.Task;
import io.druid.indexing.overlord.HeapMemoryTaskStorage;
import io.druid.indexing.overlord.TaskLockbox;
@ -312,7 +313,8 @@ public class IngestSegmentFirehoseFactoryTest
null,
null,
null,
null
null,
new NoopTestTaskFileWriter()
);
Collection<Object[]> values = new LinkedList<>();
for (InputRowParser parser : Arrays.<InputRowParser>asList(

View File

@ -47,6 +47,7 @@ import io.druid.indexing.common.actions.TaskActionClient;
import io.druid.indexing.common.actions.TaskActionClientFactory;
import io.druid.indexing.common.config.TaskConfig;
import io.druid.indexing.common.task.NoopTask;
import io.druid.indexing.common.task.NoopTestTaskFileWriter;
import io.druid.indexing.common.task.Task;
import io.druid.java.util.common.DateTimes;
import io.druid.java.util.common.Intervals;
@ -343,7 +344,8 @@ public class IngestSegmentFirehoseFactoryTimelineTest
null,
null,
null,
null
null,
new NoopTestTaskFileWriter()
);
final IngestSegmentFirehoseFactory factory = new IngestSegmentFirehoseFactory(
DATA_SOURCE,

View File

@ -63,6 +63,7 @@ import io.druid.indexing.common.task.IndexTask.IndexIOConfig;
import io.druid.indexing.common.task.IndexTask.IndexIngestionSpec;
import io.druid.indexing.common.task.IndexTask.IndexTuningConfig;
import io.druid.indexing.common.task.KillTask;
import io.druid.indexing.common.task.NoopTestTaskFileWriter;
import io.druid.indexing.common.task.RealtimeIndexTask;
import io.druid.indexing.common.task.Task;
import io.druid.indexing.common.task.TaskResource;
@ -605,7 +606,8 @@ public class TaskLifecycleTest
EasyMock.createNiceMock(DruidNodeAnnouncer.class),
EasyMock.createNiceMock(DruidNode.class),
new LookupNodeService("tier"),
new DataNodeService("tier", 1000, ServerType.INDEXER_EXECUTOR, 0)
new DataNodeService("tier", 1000, ServerType.INDEXER_EXECUTOR, 0),
new NoopTestTaskFileWriter()
);
}

View File

@ -35,6 +35,7 @@ import io.druid.indexing.common.actions.TaskActionClient;
import io.druid.indexing.common.actions.TaskActionClientFactory;
import io.druid.indexing.common.config.TaskConfig;
import io.druid.indexing.common.task.NoopTask;
import io.druid.indexing.common.task.NoopTestTaskFileWriter;
import io.druid.indexing.common.task.Task;
import io.druid.indexing.common.task.Tasks;
import io.druid.indexing.overlord.ThreadPoolTaskRunner;
@ -125,7 +126,8 @@ public class WorkerTaskManagerTest
null,
null,
null,
null
null,
new NoopTestTaskFileWriter()
),
taskConfig,
new NoopServiceEmitter(),

View File

@ -36,6 +36,7 @@ import io.druid.indexing.common.TestUtils;
import io.druid.indexing.common.actions.TaskActionClient;
import io.druid.indexing.common.actions.TaskActionClientFactory;
import io.druid.indexing.common.config.TaskConfig;
import io.druid.indexing.common.task.NoopTestTaskFileWriter;
import io.druid.indexing.common.task.Task;
import io.druid.indexing.overlord.TestRemoteTaskRunnerConfig;
import io.druid.indexing.overlord.ThreadPoolTaskRunner;
@ -190,7 +191,8 @@ public class WorkerTaskMonitorTest
null,
null,
null,
null
null,
new NoopTestTaskFileWriter()
),
taskConfig,
new NoopServiceEmitter(),

View File

@ -54,6 +54,7 @@ import io.druid.guice.annotations.Json;
import io.druid.guice.annotations.Smile;
import io.druid.indexing.common.RetryPolicyConfig;
import io.druid.indexing.common.RetryPolicyFactory;
import io.druid.indexing.common.TaskReportFileWriter;
import io.druid.indexing.common.TaskToolboxFactory;
import io.druid.indexing.common.actions.LocalTaskActionClientFactory;
import io.druid.indexing.common.actions.RemoteTaskActionClientFactory;
@ -113,9 +114,18 @@ import java.util.Set;
)
public class CliPeon extends GuiceRunnable
{
@Arguments(description = "task.json status.json", required = true)
@Arguments(description = "task.json status.json report.json", required = true)
public List<String> taskAndStatusFile;
// path to store the task's stdout log
private String taskLogPath;
// path to store the task's TaskStatus
private String taskStatusPath;
// path to store the task's TaskReport objects
private String taskReportPath;
@Option(name = "--nodeType", title = "nodeType", description = "Set the node type to expose on ZK")
public String nodeType = "indexer-executor";
@ -141,6 +151,10 @@ public class CliPeon extends GuiceRunnable
@Override
public void configure(Binder binder)
{
taskLogPath = taskAndStatusFile.get(0);
taskStatusPath = taskAndStatusFile.get(1);
taskReportPath = taskAndStatusFile.get(2);
binder.bindConstant().annotatedWith(Names.named("serviceName")).to("druid/peon");
binder.bindConstant().annotatedWith(Names.named("servicePort")).to(0);
binder.bindConstant().annotatedWith(Names.named("tlsServicePort")).to(-1);
@ -183,8 +197,14 @@ public class CliPeon extends GuiceRunnable
LifecycleModule.register(binder, ExecutorLifecycle.class);
binder.bind(ExecutorLifecycleConfig.class).toInstance(
new ExecutorLifecycleConfig()
.setTaskFile(new File(taskAndStatusFile.get(0)))
.setStatusFile(new File(taskAndStatusFile.get(1)))
.setTaskFile(new File(taskLogPath))
.setStatusFile(new File(taskStatusPath))
);
binder.bind(TaskReportFileWriter.class).toInstance(
new TaskReportFileWriter(
new File(taskReportPath)
)
);
binder.bind(TaskRunner.class).to(ThreadPoolTaskRunner.class);