Cleanup serialiazation of TaskReportMap (#16217)

* Build task reports in AbstractBatchIndexTask

* Minor cleanup

* Apply suggestions from code review by @abhishekrb

Co-authored-by: Abhishek Radhakrishnan <abhishek.rb19@gmail.com>

* Cleanup IndexTaskTest

* Fix formatting

* Fix coverage

* Cleanup serialization of TaskReport map

* Replace occurrences of Map<String, TaskReport>

* Return TaskReport.ReportMap for live reports, fix test comparisons

* Address test failures

---------

Co-authored-by: Abhishek Radhakrishnan <abhishek.rb19@gmail.com>
This commit is contained in:
Kashif Faraz 2024-04-02 00:23:24 +05:30 committed by GitHub
parent 1aa6808b9a
commit 0de44d91f1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
44 changed files with 387 additions and 254 deletions

View File

@ -32,7 +32,6 @@ import org.apache.druid.msq.statistics.PartialKeyStatisticsInformation;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import java.util.List; import java.util.List;
import java.util.Map;
/** /**
* Interface for the controller of a multi-stage query. * Interface for the controller of a multi-stage query.
@ -123,6 +122,6 @@ public interface Controller
List<String> getTaskIds(); List<String> getTaskIds();
@Nullable @Nullable
Map<String, TaskReport> liveReports(); TaskReport.ReportMap liveReports();
} }

View File

@ -28,8 +28,6 @@ import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.server.DruidNode; import org.apache.druid.server.DruidNode;
import java.util.Map;
/** /**
* Context used by multi-stage query controllers. * Context used by multi-stage query controllers.
* *
@ -80,5 +78,5 @@ public interface ControllerContext
/** /**
* Writes controller task report. * Writes controller task report.
*/ */
void writeReports(String controllerTaskId, Map<String, TaskReport> reports); void writeReports(String controllerTaskId, TaskReport.ReportMap reports);
} }

View File

@ -916,7 +916,7 @@ public class ControllerImpl implements Controller
@Override @Override
@Nullable @Nullable
public Map<String, TaskReport> liveReports() public TaskReport.ReportMap liveReports()
{ {
final QueryDefinition queryDef = queryDefRef.get(); final QueryDefinition queryDef = queryDefRef.get();

View File

@ -41,8 +41,6 @@ import org.apache.druid.rpc.indexing.OverlordClient;
import org.apache.druid.segment.realtime.firehose.ChatHandler; import org.apache.druid.segment.realtime.firehose.ChatHandler;
import org.apache.druid.server.DruidNode; import org.apache.druid.server.DruidNode;
import java.util.Map;
/** /**
* Implementation for {@link ControllerContext} required to run multi-stage queries as indexing tasks. * Implementation for {@link ControllerContext} required to run multi-stage queries as indexing tasks.
*/ */
@ -126,7 +124,7 @@ public class IndexerControllerContext implements ControllerContext
} }
@Override @Override
public void writeReports(String controllerTaskId, Map<String, TaskReport> reports) public void writeReports(String controllerTaskId, TaskReport.ReportMap reports)
{ {
toolbox.getTaskReportFileWriter().write(controllerTaskId, reports); toolbox.getTaskReportFileWriter().write(controllerTaskId, reports);
} }

View File

@ -45,7 +45,6 @@ import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType; import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response; import javax.ws.rs.core.Response;
import java.util.List; import java.util.List;
import java.util.Map;
public class ControllerChatHandler implements ChatHandler public class ControllerChatHandler implements ChatHandler
{ {
@ -189,7 +188,7 @@ public class ControllerChatHandler implements ChatHandler
public Response httpGetLiveReports(@Context final HttpServletRequest req) public Response httpGetLiveReports(@Context final HttpServletRequest req)
{ {
ChatHandlers.authorizationCheck(req, Action.WRITE, task.getDataSource(), toolbox.getAuthorizerMapper()); ChatHandlers.authorizationCheck(req, Action.WRITE, task.getDataSource(), toolbox.getAuthorizerMapper());
final Map<String, TaskReport> reports = controller.liveReports(); final TaskReport.ReportMap reports = controller.liveReports();
if (reports == null) { if (reports == null) {
return Response.status(Response.Status.NOT_FOUND).build(); return Response.status(Response.Status.NOT_FOUND).build();
} }

View File

@ -53,7 +53,6 @@ import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.core.Response; import javax.ws.rs.core.Response;
import java.io.InputStream; import java.io.InputStream;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map;
public class WorkerChatHandlerTest public class WorkerChatHandlerTest
{ {
@ -88,7 +87,7 @@ public class WorkerChatHandlerTest
new TaskReportFileWriter() new TaskReportFileWriter()
{ {
@Override @Override
public void write(String taskId, Map<String, TaskReport> reports) public void write(String taskId, TaskReport.ReportMap reports)
{ {
} }

View File

@ -0,0 +1,69 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.msq.indexing.client;
import org.apache.druid.indexing.common.KillTaskReport;
import org.apache.druid.indexing.common.TaskReport;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.msq.exec.Controller;
import org.apache.druid.msq.indexing.MSQControllerTask;
import org.apache.druid.server.security.AuthorizerMapper;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.core.Response;
public class ControllerChatHandlerTest
{
@Test
public void testHttpGetLiveReports()
{
final Controller controller = Mockito.mock(Controller.class);
TaskReport.ReportMap reportMap = new TaskReport.ReportMap();
reportMap.put("killUnusedSegments", new KillTaskReport("kill_1", new KillTaskReport.Stats(1, 2, 3)));
Mockito.when(controller.liveReports())
.thenReturn(reportMap);
MSQControllerTask task = Mockito.mock(MSQControllerTask.class);
Mockito.when(task.getDataSource())
.thenReturn("wiki");
Mockito.when(controller.task())
.thenReturn(task);
TaskToolbox toolbox = Mockito.mock(TaskToolbox.class);
Mockito.when(toolbox.getAuthorizerMapper())
.thenReturn(new AuthorizerMapper(null));
ControllerChatHandler chatHandler = new ControllerChatHandler(toolbox, controller);
HttpServletRequest httpRequest = Mockito.mock(HttpServletRequest.class);
Mockito.when(httpRequest.getAttribute(ArgumentMatchers.anyString()))
.thenReturn("allow-all");
Response response = chatHandler.httpGetLiveReports(httpRequest);
Assert.assertEquals(200, response.getStatus());
Assert.assertEquals(reportMap, response.getEntity());
}
}

View File

@ -57,7 +57,6 @@ import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map;
public class MSQTaskReportTest public class MSQTaskReportTest
{ {
@ -242,9 +241,9 @@ public class MSQTaskReportTest
writer.setObjectMapper(mapper); writer.setObjectMapper(mapper);
writer.write(TASK_ID, TaskReport.buildTaskReports(report)); writer.write(TASK_ID, TaskReport.buildTaskReports(report));
final Map<String, TaskReport> reportMap = mapper.readValue( final TaskReport.ReportMap reportMap = mapper.readValue(
reportFile, reportFile,
new TypeReference<Map<String, TaskReport>>() new TypeReference<TaskReport.ReportMap>()
{ {
} }
); );

View File

@ -88,7 +88,7 @@ public class MSQTestControllerContext implements ControllerContext
private final ServiceEmitter emitter = new NoopServiceEmitter(); private final ServiceEmitter emitter = new NoopServiceEmitter();
private Controller controller; private Controller controller;
private Map<String, TaskReport> report = null; private TaskReport.ReportMap report = null;
private final WorkerMemoryParameters workerMemoryParameters; private final WorkerMemoryParameters workerMemoryParameters;
public MSQTestControllerContext( public MSQTestControllerContext(
@ -273,14 +273,14 @@ public class MSQTestControllerContext implements ControllerContext
} }
@Override @Override
public void writeReports(String controllerTaskId, Map<String, TaskReport> taskReport) public void writeReports(String controllerTaskId, TaskReport.ReportMap taskReport)
{ {
if (controller != null && controller.id().equals(controllerTaskId)) { if (controller != null && controller.id().equals(controllerTaskId)) {
report = taskReport; report = taskReport;
} }
} }
public Map<String, TaskReport> getAllReports() public TaskReport.ReportMap getAllReports()
{ {
return report; return report;
} }

View File

@ -55,7 +55,7 @@ public class MSQTestOverlordServiceClient extends NoopOverlordClient
private final WorkerMemoryParameters workerMemoryParameters; private final WorkerMemoryParameters workerMemoryParameters;
private final List<ImmutableSegmentLoadInfo> loadedSegmentMetadata; private final List<ImmutableSegmentLoadInfo> loadedSegmentMetadata;
private final Map<String, Controller> inMemoryControllers = new HashMap<>(); private final Map<String, Controller> inMemoryControllers = new HashMap<>();
private final Map<String, Map<String, TaskReport>> reports = new HashMap<>(); private final Map<String, TaskReport.ReportMap> reports = new HashMap<>();
private final Map<String, MSQControllerTask> inMemoryControllerTask = new HashMap<>(); private final Map<String, MSQControllerTask> inMemoryControllerTask = new HashMap<>();
private final Map<String, TaskStatus> inMemoryTaskStatus = new HashMap<>(); private final Map<String, TaskStatus> inMemoryTaskStatus = new HashMap<>();
@ -171,7 +171,7 @@ public class MSQTestOverlordServiceClient extends NoopOverlordClient
// hooks to pull stuff out for testing // hooks to pull stuff out for testing
@Nullable @Nullable
public Map<String, TaskReport> getReportForTask(String id) public TaskReport.ReportMap getReportForTask(String id)
{ {
return reports.get(id); return reports.get(id);
} }

View File

@ -126,7 +126,7 @@ public class MSQTestWorkerContext implements WorkerContext
final TaskReportFileWriter reportFileWriter = new TaskReportFileWriter() final TaskReportFileWriter reportFileWriter = new TaskReportFileWriter()
{ {
@Override @Override
public void write(String taskId, Map<String, TaskReport> reports) public void write(String taskId, TaskReport.ReportMap reports)
{ {
} }

View File

@ -62,6 +62,25 @@ public class KillTaskReport implements TaskReport
return stats; return stats;
} }
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
KillTaskReport that = (KillTaskReport) o;
return Objects.equals(taskId, that.taskId) && Objects.equals(stats, that.stats);
}
@Override
public int hashCode()
{
return Objects.hash(taskId, stats);
}
public static class Stats public static class Stats
{ {
private final int numSegmentsKilled; private final int numSegmentsKilled;

View File

@ -37,7 +37,7 @@ public class MultipleFileTaskReportFileWriter implements TaskReportFileWriter
private ObjectMapper objectMapper; private ObjectMapper objectMapper;
@Override @Override
public void write(String taskId, Map<String, TaskReport> reports) public void write(String taskId, TaskReport.ReportMap reports)
{ {
final File reportsFile = taskReportFiles.get(taskId); final File reportsFile = taskReportFiles.get(taskId);
if (reportsFile == null) { if (reportsFile == null) {

View File

@ -19,17 +19,13 @@
package org.apache.druid.indexing.common; package org.apache.druid.indexing.common;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializerProvider;
import org.apache.druid.java.util.common.FileUtils; import org.apache.druid.java.util.common.FileUtils;
import org.apache.druid.java.util.common.jackson.JacksonUtils;
import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.java.util.common.logger.Logger;
import java.io.File; import java.io.File;
import java.io.FileOutputStream; import java.io.FileOutputStream;
import java.io.OutputStream; import java.io.OutputStream;
import java.util.Map;
public class SingleFileTaskReportFileWriter implements TaskReportFileWriter public class SingleFileTaskReportFileWriter implements TaskReportFileWriter
{ {
@ -44,7 +40,7 @@ public class SingleFileTaskReportFileWriter implements TaskReportFileWriter
} }
@Override @Override
public void write(String taskId, Map<String, TaskReport> reports) public void write(String taskId, TaskReport.ReportMap reports)
{ {
try { try {
final File reportsFileParent = reportsFile.getParentFile(); final File reportsFileParent = reportsFile.getParentFile();
@ -70,20 +66,9 @@ public class SingleFileTaskReportFileWriter implements TaskReportFileWriter
public static void writeReportToStream( public static void writeReportToStream(
final ObjectMapper objectMapper, final ObjectMapper objectMapper,
final OutputStream outputStream, final OutputStream outputStream,
final Map<String, TaskReport> reports final TaskReport.ReportMap reports
) throws Exception ) throws Exception
{ {
final SerializerProvider serializers = objectMapper.getSerializerProviderInstance(); objectMapper.writeValue(outputStream, reports);
try (final JsonGenerator jg = objectMapper.getFactory().createGenerator(outputStream)) {
jg.writeStartObject();
for (final Map.Entry<String, TaskReport> entry : reports.entrySet()) {
jg.writeFieldName(entry.getKey());
JacksonUtils.writeObjectUsingSerializerProvider(jg, serializers, entry.getValue());
}
jg.writeEndObject();
}
} }
} }

View File

@ -21,9 +21,9 @@ package org.apache.druid.indexing.common;
import com.fasterxml.jackson.annotation.JsonSubTypes; import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo; import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.google.common.base.Optional;
import java.util.LinkedHashMap; import java.util.LinkedHashMap;
import java.util.Map;
/** /**
* TaskReport objects contain additional information about an indexing task, such as row statistics, errors, and * TaskReport objects contain additional information about an indexing task, such as row statistics, errors, and
@ -31,7 +31,10 @@ import java.util.Map;
*/ */
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type") @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
@JsonSubTypes(value = { @JsonSubTypes(value = {
@JsonSubTypes.Type(name = "ingestionStatsAndErrors", value = IngestionStatsAndErrorsTaskReport.class), @JsonSubTypes.Type(
name = IngestionStatsAndErrorsTaskReport.REPORT_KEY,
value = IngestionStatsAndErrorsTaskReport.class
),
@JsonSubTypes.Type(name = KillTaskReport.REPORT_KEY, value = KillTaskReport.class) @JsonSubTypes.Type(name = KillTaskReport.REPORT_KEY, value = KillTaskReport.class)
}) })
public interface TaskReport public interface TaskReport
@ -48,13 +51,29 @@ public interface TaskReport
/** /**
* Returns an order-preserving map that is suitable for passing into {@link TaskReportFileWriter#write}. * Returns an order-preserving map that is suitable for passing into {@link TaskReportFileWriter#write}.
*/ */
static Map<String, TaskReport> buildTaskReports(TaskReport... taskReports) static ReportMap buildTaskReports(TaskReport... taskReports)
{ {
// Use LinkedHashMap to preserve order of the reports. ReportMap taskReportMap = new ReportMap();
Map<String, TaskReport> taskReportMap = new LinkedHashMap<>();
for (TaskReport taskReport : taskReports) { for (TaskReport taskReport : taskReports) {
taskReportMap.put(taskReport.getReportKey(), taskReport); taskReportMap.put(taskReport.getReportKey(), taskReport);
} }
return taskReportMap; return taskReportMap;
} }
/**
* Represents an ordered map from report key to a TaskReport that is compatible
* for writing out reports to files or serving over HTTP.
* <p>
* This class is needed for Jackson serde to work correctly. Without this class,
* a TaskReport is serialized without the type information and cannot be
* deserialized back into a concrete implementation.
*/
class ReportMap extends LinkedHashMap<String, TaskReport>
{
@SuppressWarnings("unchecked")
public <T extends TaskReport> Optional<T> findReport(String reportKey)
{
return Optional.fromNullable((T) get(reportKey));
}
}
} }

View File

@ -21,11 +21,9 @@ package org.apache.druid.indexing.common;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.Map;
public interface TaskReportFileWriter public interface TaskReportFileWriter
{ {
void write(String taskId, Map<String, TaskReport> reports); void write(String taskId, TaskReport.ReportMap reports);
void setObjectMapper(ObjectMapper objectMapper); void setObjectMapper(ObjectMapper objectMapper);
} }

View File

@ -903,11 +903,35 @@ public abstract class AbstractBatchIndexTask extends AbstractTask
return null; return null;
} }
protected TaskReport.ReportMap buildLiveIngestionStatsReport(
IngestionState ingestionState,
Map<String, Object> unparseableEvents,
Map<String, Object> rowStats
)
{
return TaskReport.buildTaskReports(
new IngestionStatsAndErrorsTaskReport(
getId(),
new IngestionStatsAndErrors(
ingestionState,
unparseableEvents,
rowStats,
null,
false,
0L,
null,
null,
null
)
)
);
}
/** /**
* Builds a singleton map with {@link IngestionStatsAndErrorsTaskReport#REPORT_KEY} * Builds a singleton map with {@link IngestionStatsAndErrorsTaskReport#REPORT_KEY}
* as key and an {@link IngestionStatsAndErrorsTaskReport} for this task as value. * as key and an {@link IngestionStatsAndErrorsTaskReport} for this task as value.
*/ */
protected Map<String, TaskReport> buildIngestionStatsReport( protected TaskReport.ReportMap buildIngestionStatsReport(
IngestionState ingestionState, IngestionState ingestionState,
String errorMessage, String errorMessage,
Long segmentsRead, Long segmentsRead,

View File

@ -609,7 +609,7 @@ public class AppenderatorDriverRealtimeIndexTask extends AbstractTask implements
* *
* @return Map of reports for the task. * @return Map of reports for the task.
*/ */
private Map<String, TaskReport> getTaskCompletionReports() private TaskReport.ReportMap getTaskCompletionReports()
{ {
return TaskReport.buildTaskReports( return TaskReport.buildTaskReports(
new IngestionStatsAndErrorsTaskReport( new IngestionStatsAndErrorsTaskReport(

View File

@ -502,7 +502,7 @@ public class CompactionTask extends AbstractBatchIndexTask
log.info("Generated [%d] compaction task specs", totalNumSpecs); log.info("Generated [%d] compaction task specs", totalNumSpecs);
int failCnt = 0; int failCnt = 0;
Map<String, TaskReport> completionReports = new HashMap<>(); final TaskReport.ReportMap completionReports = new TaskReport.ReportMap();
for (int i = 0; i < indexTaskSpecs.size(); i++) { for (int i = 0; i < indexTaskSpecs.size(); i++) {
ParallelIndexSupervisorTask eachSpec = indexTaskSpecs.get(i); ParallelIndexSupervisorTask eachSpec = indexTaskSpecs.get(i);
final String json = toolbox.getJsonMapper().writerWithDefaultPrettyPrinter().writeValueAsString(eachSpec); final String json = toolbox.getJsonMapper().writerWithDefaultPrettyPrinter().writeValueAsString(eachSpec);
@ -521,9 +521,11 @@ public class CompactionTask extends AbstractBatchIndexTask
} }
String reportKeySuffix = "_" + i; String reportKeySuffix = "_" + i;
Optional.ofNullable(eachSpec.getCompletionReports()) Optional.ofNullable(eachSpec.getCompletionReports()).ifPresent(
.ifPresent(reports -> completionReports.putAll( reports -> completionReports.putAll(
CollectionUtils.mapKeys(reports, key -> key + reportKeySuffix))); CollectionUtils.mapKeys(reports, key -> key + reportKeySuffix)
)
);
} else { } else {
failCnt++; failCnt++;
log.warn("indexSpec is not ready: [%s].\nTrying the next indexSpec.", json); log.warn("indexSpec is not ready: [%s].\nTrying the next indexSpec.", json);

View File

@ -680,7 +680,7 @@ public class HadoopIndexTask extends HadoopTask implements ChatHandler
return Response.ok(returnMap).build(); return Response.ok(returnMap).build();
} }
private Map<String, TaskReport> getTaskCompletionReports() private TaskReport.ReportMap getTaskCompletionReports()
{ {
return buildIngestionStatsReport(ingestionState, errorMsg, null, null); return buildIngestionStatsReport(ingestionState, errorMsg, null, null);
} }

View File

@ -187,7 +187,7 @@ public class IndexTask extends AbstractBatchIndexTask implements ChatHandler
@Nullable @Nullable
private String errorMsg; private String errorMsg;
private Map<String, TaskReport> completionReports; private TaskReport.ReportMap completionReports;
@JsonCreator @JsonCreator
public IndexTask( public IndexTask(
@ -320,7 +320,7 @@ public class IndexTask extends AbstractBatchIndexTask implements ChatHandler
@Nullable @Nullable
@JsonIgnore @JsonIgnore
public Map<String, TaskReport> getCompletionReports() public TaskReport.ReportMap getCompletionReports()
{ {
return completionReports; return completionReports;
} }
@ -415,21 +415,13 @@ public class IndexTask extends AbstractBatchIndexTask implements ChatHandler
) )
{ {
IndexTaskUtils.datasourceAuthorizationCheck(req, Action.READ, getDataSource(), authorizerMapper); IndexTaskUtils.datasourceAuthorizationCheck(req, Action.READ, getDataSource(), authorizerMapper);
Map<String, Object> returnMap = new HashMap<>();
Map<String, Object> ingestionStatsAndErrors = new HashMap<>();
Map<String, Object> payload = new HashMap<>();
Map<String, Object> events = getTaskCompletionUnparseableEvents();
payload.put("ingestionState", ingestionState); final TaskReport.ReportMap liveReports = buildLiveIngestionStatsReport(
payload.put("unparseableEvents", events); ingestionState,
payload.put("rowStats", doGetRowStats(full != null)); getTaskCompletionUnparseableEvents(),
doGetRowStats(full != null)
ingestionStatsAndErrors.put("taskId", getId()); );
ingestionStatsAndErrors.put("payload", payload); return Response.ok(liveReports).build();
ingestionStatsAndErrors.put("type", "ingestionStatsAndErrors");
returnMap.put("ingestionStatsAndErrors", ingestionStatsAndErrors);
return Response.ok(returnMap).build();
} }
@JsonProperty("spec") @JsonProperty("spec")

View File

@ -24,7 +24,6 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.indexing.common.TaskReport; import org.apache.druid.indexing.common.TaskReport;
import java.util.List; import java.util.List;
import java.util.Map;
/** /**
* Report containing the {@link GenericPartitionStat}s created by a {@link PartialSegmentGenerateTask}. This report is * Report containing the {@link GenericPartitionStat}s created by a {@link PartialSegmentGenerateTask}. This report is
@ -38,7 +37,7 @@ class GeneratedPartitionsMetadataReport extends GeneratedPartitionsReport
GeneratedPartitionsMetadataReport( GeneratedPartitionsMetadataReport(
@JsonProperty("taskId") String taskId, @JsonProperty("taskId") String taskId,
@JsonProperty("partitionStats") List<PartitionStat> partitionStats, @JsonProperty("partitionStats") List<PartitionStat> partitionStats,
@JsonProperty("taskReport") Map<String, TaskReport> taskReport @JsonProperty("taskReport") TaskReport.ReportMap taskReport
) )
{ {
super(taskId, partitionStats, taskReport); super(taskId, partitionStats, taskReport);

View File

@ -23,7 +23,6 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.indexing.common.TaskReport; import org.apache.druid.indexing.common.TaskReport;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.Objects; import java.util.Objects;
/** /**
@ -35,9 +34,9 @@ public class GeneratedPartitionsReport implements SubTaskReport
{ {
private final String taskId; private final String taskId;
private final List<PartitionStat> partitionStats; private final List<PartitionStat> partitionStats;
private final Map<String, TaskReport> taskReport; private final TaskReport.ReportMap taskReport;
GeneratedPartitionsReport(String taskId, List<PartitionStat> partitionStats, Map<String, TaskReport> taskReport) GeneratedPartitionsReport(String taskId, List<PartitionStat> partitionStats, TaskReport.ReportMap taskReport)
{ {
this.taskId = taskId; this.taskId = taskId;
this.partitionStats = partitionStats; this.partitionStats = partitionStats;
@ -52,7 +51,7 @@ public class GeneratedPartitionsReport implements SubTaskReport
} }
@JsonProperty @JsonProperty
public Map<String, TaskReport> getTaskReport() public TaskReport.ReportMap getTaskReport()
{ {
return taskReport; return taskReport;
} }

View File

@ -202,7 +202,7 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen
private volatile Pair<Map<String, Object>, Map<String, Object>> indexGenerateRowStats; private volatile Pair<Map<String, Object>, Map<String, Object>> indexGenerateRowStats;
private IngestionState ingestionState; private IngestionState ingestionState;
private Map<String, TaskReport> completionReports; private TaskReport.ReportMap completionReports;
private Long segmentsRead; private Long segmentsRead;
private Long segmentsPublished; private Long segmentsPublished;
private final boolean isCompactionTask; private final boolean isCompactionTask;
@ -300,7 +300,7 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen
@Nullable @Nullable
@JsonIgnore @JsonIgnore
public Map<String, TaskReport> getCompletionReports() public TaskReport.ReportMap getCompletionReports()
{ {
return completionReports; return completionReports;
} }
@ -1238,7 +1238,7 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen
/** /**
* Generate an IngestionStatsAndErrorsTaskReport for the task. * Generate an IngestionStatsAndErrorsTaskReport for the task.
*/ */
private Map<String, TaskReport> getTaskCompletionReports(TaskStatus taskStatus) private TaskReport.ReportMap getTaskCompletionReports(TaskStatus taskStatus)
{ {
return buildIngestionStatsReport( return buildIngestionStatsReport(
IngestionState.COMPLETED, IngestionState.COMPLETED,
@ -1602,7 +1602,7 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen
// Get stats from completed tasks // Get stats from completed tasks
Map<String, PushedSegmentsReport> completedSubtaskReports = parallelSinglePhaseRunner.getReports(); Map<String, PushedSegmentsReport> completedSubtaskReports = parallelSinglePhaseRunner.getReports();
for (PushedSegmentsReport pushedSegmentsReport : completedSubtaskReports.values()) { for (PushedSegmentsReport pushedSegmentsReport : completedSubtaskReports.values()) {
Map<String, TaskReport> taskReport = pushedSegmentsReport.getTaskReport(); TaskReport.ReportMap taskReport = pushedSegmentsReport.getTaskReport();
if (taskReport == null || taskReport.isEmpty()) { if (taskReport == null || taskReport.isEmpty()) {
LOG.warn("Received an empty report from subtask[%s]" + pushedSegmentsReport.getTaskId()); LOG.warn("Received an empty report from subtask[%s]" + pushedSegmentsReport.getTaskId());
continue; continue;
@ -1642,7 +1642,7 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen
final List<ParseExceptionReport> unparseableEvents = new ArrayList<>(); final List<ParseExceptionReport> unparseableEvents = new ArrayList<>();
long totalSegmentsRead = 0L; long totalSegmentsRead = 0L;
for (GeneratedPartitionsReport generatedPartitionsReport : completedSubtaskReports.values()) { for (GeneratedPartitionsReport generatedPartitionsReport : completedSubtaskReports.values()) {
Map<String, TaskReport> taskReport = generatedPartitionsReport.getTaskReport(); TaskReport.ReportMap taskReport = generatedPartitionsReport.getTaskReport();
if (taskReport == null || taskReport.isEmpty()) { if (taskReport == null || taskReport.isEmpty()) {
LOG.warn("Received an empty report from subtask[%s]", generatedPartitionsReport.getTaskId()); LOG.warn("Received an empty report from subtask[%s]", generatedPartitionsReport.getTaskId());
continue; continue;
@ -1726,7 +1726,7 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen
} }
private RowIngestionMetersTotals getBuildSegmentsStatsFromTaskReport( private RowIngestionMetersTotals getBuildSegmentsStatsFromTaskReport(
Map<String, TaskReport> taskReport, TaskReport.ReportMap taskReport,
List<ParseExceptionReport> unparseableEvents List<ParseExceptionReport> unparseableEvents
) )
{ {
@ -1804,12 +1804,8 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen
} }
@VisibleForTesting @VisibleForTesting
public Map<String, Object> doGetLiveReports(boolean isFullReport) public TaskReport.ReportMap doGetLiveReports(boolean isFullReport)
{ {
Map<String, Object> returnMap = new HashMap<>();
Map<String, Object> ingestionStatsAndErrors = new HashMap<>();
Map<String, Object> payload = new HashMap<>();
Pair<Map<String, Object>, Map<String, Object>> rowStatsAndUnparsebleEvents = Pair<Map<String, Object>, Map<String, Object>> rowStatsAndUnparsebleEvents =
doGetRowStatsAndUnparseableEvents(isFullReport, true); doGetRowStatsAndUnparseableEvents(isFullReport, true);
@ -1824,16 +1820,11 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen
: currentSequentialTask.getIngestionState(); : currentSequentialTask.getIngestionState();
} }
payload.put("ingestionState", ingestionStateForReport); return buildLiveIngestionStatsReport(
payload.put("unparseableEvents", rowStatsAndUnparsebleEvents.rhs); ingestionStateForReport,
payload.put("rowStats", rowStatsAndUnparsebleEvents.lhs); rowStatsAndUnparsebleEvents.rhs,
rowStatsAndUnparsebleEvents.lhs
ingestionStatsAndErrors.put("taskId", getId()); );
ingestionStatsAndErrors.put("payload", payload);
ingestionStatsAndErrors.put("type", "ingestionStatsAndErrors");
returnMap.put("ingestionStatsAndErrors", ingestionStatsAndErrors);
return returnMap;
} }
@GET @GET

View File

@ -177,7 +177,7 @@ public class PartialHashSegmentGenerateTask extends PartialSegmentGenerateTask<G
} }
@Override @Override
GeneratedPartitionsMetadataReport createGeneratedPartitionsReport(TaskToolbox toolbox, List<DataSegment> segments, Map<String, TaskReport> taskReport) GeneratedPartitionsMetadataReport createGeneratedPartitionsReport(TaskToolbox toolbox, List<DataSegment> segments, TaskReport.ReportMap taskReport)
{ {
List<PartitionStat> partitionStats = segments.stream() List<PartitionStat> partitionStats = segments.stream()
.map(segment -> toolbox.getIntermediaryDataManager().generatePartitionStat(toolbox, segment)) .map(segment -> toolbox.getIntermediaryDataManager().generatePartitionStat(toolbox, segment))

View File

@ -192,7 +192,7 @@ public class PartialRangeSegmentGenerateTask extends PartialSegmentGenerateTask<
} }
@Override @Override
GeneratedPartitionsMetadataReport createGeneratedPartitionsReport(TaskToolbox toolbox, List<DataSegment> segments, Map<String, TaskReport> taskReport) GeneratedPartitionsMetadataReport createGeneratedPartitionsReport(TaskToolbox toolbox, List<DataSegment> segments, TaskReport.ReportMap taskReport)
{ {
List<PartitionStat> partitionStats = segments.stream() List<PartitionStat> partitionStats = segments.stream()
.map(segment -> toolbox.getIntermediaryDataManager().generatePartitionStat(toolbox, segment)) .map(segment -> toolbox.getIntermediaryDataManager().generatePartitionStat(toolbox, segment))

View File

@ -125,7 +125,7 @@ abstract class PartialSegmentGenerateTask<T extends GeneratedPartitionsReport> e
toolbox.getIndexingTmpDir() toolbox.getIndexingTmpDir()
); );
Map<String, TaskReport> taskReport = getTaskCompletionReports(getNumSegmentsRead(inputSource)); TaskReport.ReportMap taskReport = getTaskCompletionReports(getNumSegmentsRead(inputSource));
taskClient.report(createGeneratedPartitionsReport(toolbox, segments, taskReport)); taskClient.report(createGeneratedPartitionsReport(toolbox, segments, taskReport));
@ -146,7 +146,7 @@ abstract class PartialSegmentGenerateTask<T extends GeneratedPartitionsReport> e
abstract T createGeneratedPartitionsReport( abstract T createGeneratedPartitionsReport(
TaskToolbox toolbox, TaskToolbox toolbox,
List<DataSegment> segments, List<DataSegment> segments,
Map<String, TaskReport> taskReport TaskReport.ReportMap taskReport
); );
private Long getNumSegmentsRead(InputSource inputSource) private Long getNumSegmentsRead(InputSource inputSource)
@ -249,7 +249,7 @@ abstract class PartialSegmentGenerateTask<T extends GeneratedPartitionsReport> e
/** /**
* Generate an IngestionStatsAndErrorsTaskReport for the task. * Generate an IngestionStatsAndErrorsTaskReport for the task.
*/ */
private Map<String, TaskReport> getTaskCompletionReports(Long segmentsRead) private TaskReport.ReportMap getTaskCompletionReports(Long segmentsRead)
{ {
return buildIngestionStatsReport( return buildIngestionStatsReport(
IngestionState.COMPLETED, IngestionState.COMPLETED,

View File

@ -22,12 +22,12 @@ package org.apache.druid.indexing.common.task.batch.parallel;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.common.base.Stopwatch; import com.google.common.base.Stopwatch;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import it.unimi.dsi.fastutil.ints.Int2ObjectMap; import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap; import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.TaskLock; import org.apache.druid.indexing.common.TaskLock;
import org.apache.druid.indexing.common.TaskReport;
import org.apache.druid.indexing.common.TaskToolbox; import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.actions.LockListAction; import org.apache.druid.indexing.common.actions.LockListAction;
import org.apache.druid.indexing.common.actions.SurrogateAction; import org.apache.druid.indexing.common.actions.SurrogateAction;
@ -189,7 +189,9 @@ abstract class PartialSegmentMergeTask<S extends ShardSpec> extends PerfectRollu
intervalToUnzippedFiles intervalToUnzippedFiles
); );
taskClient.report(new PushedSegmentsReport(getId(), Collections.emptySet(), pushedSegments, ImmutableMap.of())); taskClient.report(
new PushedSegmentsReport(getId(), Collections.emptySet(), pushedSegments, new TaskReport.ReportMap())
);
return TaskStatus.success(getId()); return TaskStatus.success(getId());
} }

View File

@ -25,7 +25,6 @@ import com.google.common.base.Preconditions;
import org.apache.druid.indexing.common.TaskReport; import org.apache.druid.indexing.common.TaskReport;
import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.DataSegment;
import java.util.Map;
import java.util.Objects; import java.util.Objects;
import java.util.Set; import java.util.Set;
@ -41,14 +40,14 @@ public class PushedSegmentsReport implements SubTaskReport
private final String taskId; private final String taskId;
private final Set<DataSegment> oldSegments; private final Set<DataSegment> oldSegments;
private final Set<DataSegment> newSegments; private final Set<DataSegment> newSegments;
private final Map<String, TaskReport> taskReport; private final TaskReport.ReportMap taskReport;
@JsonCreator @JsonCreator
public PushedSegmentsReport( public PushedSegmentsReport(
@JsonProperty("taskId") String taskId, @JsonProperty("taskId") String taskId,
@JsonProperty("oldSegments") Set<DataSegment> oldSegments, @JsonProperty("oldSegments") Set<DataSegment> oldSegments,
@JsonProperty("segments") Set<DataSegment> newSegments, @JsonProperty("segments") Set<DataSegment> newSegments,
@JsonProperty("taskReport") Map<String, TaskReport> taskReport @JsonProperty("taskReport") TaskReport.ReportMap taskReport
) )
{ {
this.taskId = Preconditions.checkNotNull(taskId, "taskId"); this.taskId = Preconditions.checkNotNull(taskId, "taskId");
@ -77,7 +76,7 @@ public class PushedSegmentsReport implements SubTaskReport
} }
@JsonProperty("taskReport") @JsonProperty("taskReport")
public Map<String, TaskReport> getTaskReport() public TaskReport.ReportMap getTaskReport()
{ {
return taskReport; return taskReport;
} }

View File

@ -282,7 +282,7 @@ public class SinglePhaseSubTask extends AbstractBatchSubtask implements ChatHand
.transform(PartitionChunk::getObject) .transform(PartitionChunk::getObject)
.toSet(); .toSet();
Map<String, TaskReport> taskReport = getTaskCompletionReports(); TaskReport.ReportMap taskReport = getTaskCompletionReports();
taskClient.report(new PushedSegmentsReport(getId(), oldSegments, pushedSegments, taskReport)); taskClient.report(new PushedSegmentsReport(getId(), oldSegments, pushedSegments, taskReport));
toolbox.getTaskReportFileWriter().write(getId(), taskReport); toolbox.getTaskReportFileWriter().write(getId(), taskReport);
@ -542,23 +542,13 @@ public class SinglePhaseSubTask extends AbstractBatchSubtask implements ChatHand
return Response.ok(doGetRowStats(full != null)).build(); return Response.ok(doGetRowStats(full != null)).build();
} }
private Map<String, Object> doGetLiveReports(boolean isFullReport) private TaskReport.ReportMap doGetLiveReports(boolean isFullReport)
{ {
Map<String, Object> returnMap = new HashMap<>(); return buildLiveIngestionStatsReport(
Map<String, Object> ingestionStatsAndErrors = new HashMap<>(); ingestionState,
Map<String, Object> payload = new HashMap<>(); getTaskCompletionUnparseableEvents(),
Map<String, Object> events = getTaskCompletionUnparseableEvents(); doGetRowStats(isFullReport)
);
payload.put("ingestionState", ingestionState);
payload.put("unparseableEvents", events);
payload.put("rowStats", doGetRowStats(isFullReport));
ingestionStatsAndErrors.put("taskId", getId());
ingestionStatsAndErrors.put("payload", payload);
ingestionStatsAndErrors.put("type", "ingestionStatsAndErrors");
returnMap.put("ingestionStatsAndErrors", ingestionStatsAndErrors);
return returnMap;
} }
@GET @GET
@ -585,7 +575,7 @@ public class SinglePhaseSubTask extends AbstractBatchSubtask implements ChatHand
/** /**
* Generate an IngestionStatsAndErrorsTaskReport for the task. * Generate an IngestionStatsAndErrorsTaskReport for the task.
*/ */
private Map<String, TaskReport> getTaskCompletionReports() private TaskReport.ReportMap getTaskCompletionReports()
{ {
return buildIngestionStatsReport(IngestionState.COMPLETED, errorMsg, null, null); return buildIngestionStatsReport(IngestionState.COMPLETED, errorMsg, null, null);
} }

View File

@ -1118,7 +1118,7 @@ public abstract class SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
* @param handoffWaitMs Milliseconds waited for segments to be handed off. * @param handoffWaitMs Milliseconds waited for segments to be handed off.
* @return Map of reports for the task. * @return Map of reports for the task.
*/ */
private Map<String, TaskReport> getTaskCompletionReports(@Nullable String errorMsg, long handoffWaitMs) private TaskReport.ReportMap getTaskCompletionReports(@Nullable String errorMsg, long handoffWaitMs)
{ {
return TaskReport.buildTaskReports( return TaskReport.buildTaskReports(
new IngestionStatsAndErrorsTaskReport( new IngestionStatsAndErrorsTaskReport(

View File

@ -1678,9 +1678,9 @@ public class AppenderatorDriverRealtimeIndexTaskTest extends InitializedNullHand
private IngestionStatsAndErrors getTaskReportData() throws IOException private IngestionStatsAndErrors getTaskReportData() throws IOException
{ {
Map<String, TaskReport> taskReports = OBJECT_MAPPER.readValue( TaskReport.ReportMap taskReports = OBJECT_MAPPER.readValue(
reportsFile, reportsFile,
new TypeReference<Map<String, TaskReport>>() new TypeReference<TaskReport.ReportMap>()
{ {
} }
); );

View File

@ -2548,9 +2548,9 @@ public class IndexTaskTest extends IngestionTestBase
private IngestionStatsAndErrors getTaskReportData() throws IOException private IngestionStatsAndErrors getTaskReportData() throws IOException
{ {
Map<String, TaskReport> taskReports = jsonMapper.readValue( TaskReport.ReportMap taskReports = jsonMapper.readValue(
taskRunner.getTaskReportsFile(), taskRunner.getTaskReportsFile(),
new TypeReference<Map<String, TaskReport>>() new TypeReference<TaskReport.ReportMap>()
{ {
} }
); );

View File

@ -19,7 +19,6 @@
package org.apache.druid.indexing.common.task; package org.apache.druid.indexing.common.task;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Optional; import com.google.common.base.Optional;
import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.Futures;
@ -270,9 +269,6 @@ public abstract class IngestionTestBase extends InitializedNullHandlingTest
/** /**
* Converts ParseSpec to InputFormat for indexing tests. To be used until {@link FirehoseFactory} * Converts ParseSpec to InputFormat for indexing tests. To be used until {@link FirehoseFactory}
* & {@link InputRowParser} is deprecated and removed. * & {@link InputRowParser} is deprecated and removed.
*
* @param parseSpec
* @return
*/ */
public static InputFormat createInputFormatFromParseSpec(ParseSpec parseSpec) public static InputFormat createInputFormatFromParseSpec(ParseSpec parseSpec)
{ {
@ -510,11 +506,9 @@ public abstract class IngestionTestBase extends InitializedNullHandlingTest
} }
} }
public Map<String, TaskReport> getReports() throws IOException public TaskReport.ReportMap getReports() throws IOException
{ {
return objectMapper.readValue(reportsFile, new TypeReference<Map<String, TaskReport>>() return objectMapper.readValue(reportsFile, TaskReport.ReportMap.class);
{
});
} }
public List<IngestionStatsAndErrors> getIngestionReports() throws IOException public List<IngestionStatsAndErrors> getIngestionReports() throws IOException

View File

@ -1134,7 +1134,7 @@ public class KillUnusedSegmentsTaskTest extends IngestionTestBase
try { try {
Object payload = getObjectMapper().readValue( Object payload = getObjectMapper().readValue(
taskRunner.getTaskReportsFile(), taskRunner.getTaskReportsFile(),
new TypeReference<Map<String, TaskReport>>() new TypeReference<TaskReport.ReportMap>()
{ {
} }
).get(KillTaskReport.REPORT_KEY).getPayload(); ).get(KillTaskReport.REPORT_KEY).getPayload();

View File

@ -23,12 +23,10 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.druid.indexing.common.TaskReport; import org.apache.druid.indexing.common.TaskReport;
import org.apache.druid.indexing.common.TaskReportFileWriter; import org.apache.druid.indexing.common.TaskReportFileWriter;
import java.util.Map;
public class NoopTestTaskReportFileWriter implements TaskReportFileWriter public class NoopTestTaskReportFileWriter implements TaskReportFileWriter
{ {
@Override @Override
public void write(String id, Map<String, TaskReport> reports) public void write(String id, TaskReport.ReportMap reports)
{ {
} }

View File

@ -28,6 +28,7 @@ import com.google.common.io.Files;
import org.apache.druid.indexer.IngestionState; import org.apache.druid.indexer.IngestionState;
import org.apache.druid.indexing.common.IngestionStatsAndErrors; import org.apache.druid.indexing.common.IngestionStatsAndErrors;
import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReport; import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReport;
import org.apache.druid.indexing.common.KillTaskReport;
import org.apache.druid.indexing.common.SingleFileTaskReportFileWriter; import org.apache.druid.indexing.common.SingleFileTaskReportFileWriter;
import org.apache.druid.indexing.common.TaskReport; import org.apache.druid.indexing.common.TaskReport;
import org.apache.druid.indexing.common.TestUtils; import org.apache.druid.indexing.common.TestUtils;
@ -38,7 +39,7 @@ import org.junit.rules.TemporaryFolder;
import java.io.File; import java.io.File;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.Map; import java.util.Collections;
public class TaskReportSerdeTest public class TaskReportSerdeTest
{ {
@ -55,47 +56,56 @@ public class TaskReportSerdeTest
} }
@Test @Test
public void testSerde() throws Exception public void testSerdeOfIngestionReport() throws Exception
{ {
IngestionStatsAndErrorsTaskReport report1 = new IngestionStatsAndErrorsTaskReport( IngestionStatsAndErrorsTaskReport originalReport = buildTestIngestionReport();
"testID", String reportJson = jsonMapper.writeValueAsString(originalReport);
new IngestionStatsAndErrors( TaskReport deserialized = jsonMapper.readValue(reportJson, TaskReport.class);
IngestionState.BUILD_SEGMENTS,
ImmutableMap.of(
"hello", "world"
),
ImmutableMap.of(
"number", 1234
),
"an error message",
true,
1000L,
ImmutableMap.of("PartitionA", 5000L),
5L,
10L
)
);
String report1serialized = jsonMapper.writeValueAsString(report1);
IngestionStatsAndErrorsTaskReport report2 = (IngestionStatsAndErrorsTaskReport) jsonMapper.readValue(
report1serialized,
TaskReport.class
);
Assert.assertEquals(report1, report2);
Assert.assertEquals(report1.hashCode(), report2.hashCode());
Assert.assertTrue(deserialized instanceof IngestionStatsAndErrorsTaskReport);
IngestionStatsAndErrorsTaskReport deserializedReport = (IngestionStatsAndErrorsTaskReport) deserialized;
Assert.assertEquals(originalReport, deserializedReport);
}
@Test
public void testSerdeOfKillTaskReport() throws Exception
{
KillTaskReport originalReport = new KillTaskReport("taskId", new KillTaskReport.Stats(1, 2, 3));
String reportJson = jsonMapper.writeValueAsString(originalReport);
TaskReport deserialized = jsonMapper.readValue(reportJson, TaskReport.class);
Assert.assertTrue(deserialized instanceof KillTaskReport);
KillTaskReport deserializedReport = (KillTaskReport) deserialized;
Assert.assertEquals(originalReport, deserializedReport);
}
@Test
public void testWriteReportMapToFileAndRead() throws Exception
{
IngestionStatsAndErrorsTaskReport report1 = buildTestIngestionReport();
final File reportFile = temporaryFolder.newFile(); final File reportFile = temporaryFolder.newFile();
final SingleFileTaskReportFileWriter writer = new SingleFileTaskReportFileWriter(reportFile); final SingleFileTaskReportFileWriter writer = new SingleFileTaskReportFileWriter(reportFile);
writer.setObjectMapper(jsonMapper); writer.setObjectMapper(jsonMapper);
Map<String, TaskReport> reportMap1 = TaskReport.buildTaskReports(report1); TaskReport.ReportMap reportMap1 = TaskReport.buildTaskReports(report1);
writer.write("testID", reportMap1); writer.write("testID", reportMap1);
Map<String, TaskReport> reportMap2 = jsonMapper.readValue( TaskReport.ReportMap reportMap2 = jsonMapper.readValue(reportFile, TaskReport.ReportMap.class);
reportFile,
new TypeReference<Map<String, TaskReport>>() {}
);
Assert.assertEquals(reportMap1, reportMap2); Assert.assertEquals(reportMap1, reportMap2);
} }
@Test
public void testWriteReportMapToStringAndRead() throws Exception
{
IngestionStatsAndErrorsTaskReport ingestionReport = buildTestIngestionReport();
TaskReport.ReportMap reportMap = TaskReport.buildTaskReports(ingestionReport);
String json = jsonMapper.writeValueAsString(reportMap);
TaskReport.ReportMap deserializedReportMap = jsonMapper.readValue(json, TaskReport.ReportMap.class);
Assert.assertEquals(reportMap, deserializedReportMap);
}
@Test @Test
public void testSerializationOnMissingPartitionStats() throws Exception public void testSerializationOnMissingPartitionStats() throws Exception
{ {
@ -150,7 +160,7 @@ public class TaskReportSerdeTest
final File reportFile = temporaryFolder.newFile(); final File reportFile = temporaryFolder.newFile();
final SingleFileTaskReportFileWriter writer = new SingleFileTaskReportFileWriter(reportFile); final SingleFileTaskReportFileWriter writer = new SingleFileTaskReportFileWriter(reportFile);
writer.setObjectMapper(jsonMapper); writer.setObjectMapper(jsonMapper);
writer.write("theTask", ImmutableMap.of("report", new ExceptionalTaskReport())); writer.write("theTask", TaskReport.buildTaskReports(new ExceptionalTaskReport()));
// Read the file, ensure it's incomplete and not valid JSON. This allows callers to determine the report was // Read the file, ensure it's incomplete and not valid JSON. This allows callers to determine the report was
// not complete when written. // not complete when written.
@ -160,6 +170,24 @@ public class TaskReportSerdeTest
); );
} }
private IngestionStatsAndErrorsTaskReport buildTestIngestionReport()
{
return new IngestionStatsAndErrorsTaskReport(
"testID",
new IngestionStatsAndErrors(
IngestionState.BUILD_SEGMENTS,
Collections.singletonMap("hello", "world"),
Collections.singletonMap("number", 1234),
"an error message",
true,
1000L,
Collections.singletonMap("PartitionA", 5000L),
5L,
10L
)
);
}
/** /**
* Task report that throws an exception while being serialized. * Task report that throws an exception while being serialized.
*/ */

View File

@ -20,7 +20,6 @@
package org.apache.druid.indexing.common.task.batch.parallel; package org.apache.druid.indexing.common.task.batch.parallel;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import org.apache.druid.common.guava.FutureUtils;
import org.apache.druid.data.input.InputFormat; import org.apache.druid.data.input.InputFormat;
import org.apache.druid.data.input.impl.DimensionsSpec; import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.LocalInputSource; import org.apache.druid.data.input.impl.LocalInputSource;
@ -31,6 +30,7 @@ import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexer.partitions.PartitionsSpec; import org.apache.druid.indexer.partitions.PartitionsSpec;
import org.apache.druid.indexing.common.LockGranularity; import org.apache.druid.indexing.common.LockGranularity;
import org.apache.druid.indexing.common.SegmentCacheManagerFactory; import org.apache.druid.indexing.common.SegmentCacheManagerFactory;
import org.apache.druid.indexing.common.TaskReport;
import org.apache.druid.indexing.common.task.Task; import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.common.task.Tasks; import org.apache.druid.indexing.common.task.Tasks;
import org.apache.druid.indexing.input.DruidInputSource; import org.apache.druid.indexing.input.DruidInputSource;
@ -66,7 +66,6 @@ import javax.annotation.Nullable;
import java.io.File; import java.io.File;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.Set; import java.util.Set;
@SuppressWarnings("SameParameterValue") @SuppressWarnings("SameParameterValue")
@ -183,10 +182,10 @@ abstract class AbstractMultiPhaseParallelIndexingTest extends AbstractParallelIn
return getIndexingServiceClient().getPublishedSegments(task); return getIndexingServiceClient().getPublishedSegments(task);
} }
Map<String, Object> runTaskAndGetReports(Task task, TaskState expectedTaskStatus) TaskReport.ReportMap runTaskAndGetReports(Task task, TaskState expectedTaskStatus)
{ {
runTaskAndVerifyStatus(task, expectedTaskStatus); runTaskAndVerifyStatus(task, expectedTaskStatus);
return FutureUtils.getUnchecked(getIndexingServiceClient().taskReportAsMap(task.getId()), true); return getIndexingServiceClient().getLiveReportsForTask(task.getId());
} }
protected ParallelIndexSupervisorTask createTask( protected ParallelIndexSupervisorTask createTask(

View File

@ -49,10 +49,13 @@ import org.apache.druid.indexer.TaskLocation;
import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexer.TaskStatusPlus; import org.apache.druid.indexer.TaskStatusPlus;
import org.apache.druid.indexer.partitions.PartitionsSpec; import org.apache.druid.indexer.partitions.PartitionsSpec;
import org.apache.druid.indexing.common.IngestionStatsAndErrors;
import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReport;
import org.apache.druid.indexing.common.RetryPolicyConfig; import org.apache.druid.indexing.common.RetryPolicyConfig;
import org.apache.druid.indexing.common.RetryPolicyFactory; import org.apache.druid.indexing.common.RetryPolicyFactory;
import org.apache.druid.indexing.common.SegmentCacheManagerFactory; import org.apache.druid.indexing.common.SegmentCacheManagerFactory;
import org.apache.druid.indexing.common.SingleFileTaskReportFileWriter; import org.apache.druid.indexing.common.SingleFileTaskReportFileWriter;
import org.apache.druid.indexing.common.TaskReport;
import org.apache.druid.indexing.common.TaskToolbox; import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.TestUtils; import org.apache.druid.indexing.common.TestUtils;
import org.apache.druid.indexing.common.actions.TaskActionClient; import org.apache.druid.indexing.common.actions.TaskActionClient;
@ -118,7 +121,6 @@ import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -546,12 +548,17 @@ public class AbstractParallelIndexSupervisorTaskTest extends IngestionTestBase
@Override @Override
public ListenableFuture<Map<String, Object>> taskReportAsMap(String taskId) public ListenableFuture<Map<String, Object>> taskReportAsMap(String taskId)
{
return Futures.immediateFuture(null);
}
public TaskReport.ReportMap getLiveReportsForTask(String taskId)
{ {
final Optional<Task> task = getTaskStorage().getTask(taskId); final Optional<Task> task = getTaskStorage().getTask(taskId);
if (!task.isPresent()) { if (!task.isPresent()) {
return null; return null;
} }
return Futures.immediateFuture(((ParallelIndexSupervisorTask) task.get()).doGetLiveReports(true)); return ((ParallelIndexSupervisorTask) task.get()).doGetLiveReports(true);
} }
public TaskContainer getTaskContainer(String taskId) public TaskContainer getTaskContainer(String taskId)
@ -773,20 +780,16 @@ public class AbstractParallelIndexSupervisorTaskTest extends IngestionTestBase
} }
} }
protected Map<String, Object> buildExpectedTaskReportSequential( protected TaskReport.ReportMap buildExpectedTaskReportSequential(
String taskId, String taskId,
List<ParseExceptionReport> expectedUnparseableEvents, List<ParseExceptionReport> expectedUnparseableEvents,
RowIngestionMetersTotals expectedDeterminePartitions, RowIngestionMetersTotals expectedDeterminePartitions,
RowIngestionMetersTotals expectedTotals RowIngestionMetersTotals expectedTotals
) )
{ {
final Map<String, Object> payload = new HashMap<>(); final Map<String, Object> unparseableEvents =
ImmutableMap.of("determinePartitions", ImmutableList.of(), "buildSegments", expectedUnparseableEvents);
payload.put("ingestionState", IngestionState.COMPLETED);
payload.put(
"unparseableEvents",
ImmutableMap.of("determinePartitions", ImmutableList.of(), "buildSegments", expectedUnparseableEvents)
);
Map<String, Object> emptyAverageMinuteMap = ImmutableMap.of( Map<String, Object> emptyAverageMinuteMap = ImmutableMap.of(
"processed", 0.0, "processed", 0.0,
"processedBytes", 0.0, "processedBytes", 0.0,
@ -801,72 +804,90 @@ public class AbstractParallelIndexSupervisorTaskTest extends IngestionTestBase
"15m", emptyAverageMinuteMap "15m", emptyAverageMinuteMap
); );
payload.put( final Map<String, Object> rowStats = ImmutableMap.of(
"rowStats",
ImmutableMap.of(
"movingAverages", "movingAverages",
ImmutableMap.of("determinePartitions", emptyAverages, "buildSegments", emptyAverages), ImmutableMap.of("determinePartitions", emptyAverages, "buildSegments", emptyAverages),
"totals", "totals",
ImmutableMap.of("determinePartitions", expectedDeterminePartitions, "buildSegments", expectedTotals) ImmutableMap.of("determinePartitions", expectedDeterminePartitions, "buildSegments", expectedTotals)
)
); );
final Map<String, Object> ingestionStatsAndErrors = new HashMap<>(); return TaskReport.buildTaskReports(
ingestionStatsAndErrors.put("taskId", taskId); new IngestionStatsAndErrorsTaskReport(
ingestionStatsAndErrors.put("payload", payload); taskId,
ingestionStatsAndErrors.put("type", "ingestionStatsAndErrors"); new IngestionStatsAndErrors(
IngestionState.COMPLETED,
return Collections.singletonMap("ingestionStatsAndErrors", ingestionStatsAndErrors); unparseableEvents,
rowStats,
null,
false,
0L,
null,
null,
null
)
)
);
} }
protected Map<String, Object> buildExpectedTaskReportParallel( protected TaskReport.ReportMap buildExpectedTaskReportParallel(
String taskId, String taskId,
List<ParseExceptionReport> expectedUnparseableEvents, List<ParseExceptionReport> expectedUnparseableEvents,
RowIngestionMetersTotals expectedTotals RowIngestionMetersTotals expectedTotals
) )
{ {
Map<String, Object> returnMap = new HashMap<>(); Map<String, Object> unparseableEvents = ImmutableMap.of("buildSegments", expectedUnparseableEvents);
Map<String, Object> ingestionStatsAndErrors = new HashMap<>(); Map<String, Object> rowStats = ImmutableMap.of("totals", ImmutableMap.of("buildSegments", expectedTotals));
Map<String, Object> payload = new HashMap<>();
payload.put("ingestionState", IngestionState.COMPLETED); return TaskReport.buildTaskReports(
payload.put("unparseableEvents", ImmutableMap.of("buildSegments", expectedUnparseableEvents)); new IngestionStatsAndErrorsTaskReport(
payload.put("rowStats", ImmutableMap.of("totals", ImmutableMap.of("buildSegments", expectedTotals))); taskId,
new IngestionStatsAndErrors(
ingestionStatsAndErrors.put("taskId", taskId); IngestionState.COMPLETED,
ingestionStatsAndErrors.put("payload", payload); unparseableEvents,
ingestionStatsAndErrors.put("type", "ingestionStatsAndErrors"); rowStats,
null,
returnMap.put("ingestionStatsAndErrors", ingestionStatsAndErrors); false,
return returnMap; 0L,
null,
null,
null
)
)
);
} }
protected void compareTaskReports( protected void compareTaskReports(
Map<String, Object> expectedReports, TaskReport.ReportMap expectedReports,
Map<String, Object> actualReports TaskReport.ReportMap actualReports
) )
{ {
expectedReports = (Map<String, Object>) expectedReports.get("ingestionStatsAndErrors"); final Optional<IngestionStatsAndErrorsTaskReport> expectedReportOptional
actualReports = (Map<String, Object>) actualReports.get("ingestionStatsAndErrors"); = expectedReports.findReport("ingestionStatsAndErrors");
final Optional<IngestionStatsAndErrorsTaskReport> actualReportOptional
= actualReports.findReport("ingestionStatsAndErrors");
Assert.assertEquals(expectedReports.get("taskId"), actualReports.get("taskId")); Assert.assertTrue(expectedReportOptional.isPresent());
Assert.assertEquals(expectedReports.get("type"), actualReports.get("type")); Assert.assertTrue(actualReportOptional.isPresent());
Map<String, Object> expectedPayload = (Map<String, Object>) expectedReports.get("payload"); final IngestionStatsAndErrorsTaskReport expectedReport = expectedReportOptional.get();
Map<String, Object> actualPayload = (Map<String, Object>) actualReports.get("payload"); final IngestionStatsAndErrorsTaskReport actualReport = actualReportOptional.get();
Assert.assertEquals(expectedPayload.get("ingestionState"), actualPayload.get("ingestionState"));
Map<String, Object> expectedTotals = (Map<String, Object>) expectedPayload.get("totals"); Assert.assertEquals(expectedReport.getTaskId(), actualReport.getTaskId());
Map<String, Object> actualTotals = (Map<String, Object>) actualReports.get("totals"); Assert.assertEquals(expectedReport.getReportKey(), actualReport.getReportKey());
final IngestionStatsAndErrors expectedPayload = expectedReport.getPayload();
final IngestionStatsAndErrors actualPayload = actualReport.getPayload();
Assert.assertEquals(expectedPayload.getIngestionState(), actualPayload.getIngestionState());
Map<String, Object> expectedTotals = expectedPayload.getRowStats();
Map<String, Object> actualTotals = actualPayload.getRowStats();
Assert.assertEquals(expectedTotals, actualTotals); Assert.assertEquals(expectedTotals, actualTotals);
List<ParseExceptionReport> expectedParseExceptionReports = List<ParseExceptionReport> expectedParseExceptionReports =
(List<ParseExceptionReport>) ((Map<String, Object>) (List<ParseExceptionReport>) (expectedPayload.getUnparseableEvents()).get("buildSegments");
expectedPayload.get("unparseableEvents")).get("buildSegments");
List<ParseExceptionReport> actualParseExceptionReports = List<ParseExceptionReport> actualParseExceptionReports =
(List<ParseExceptionReport>) ((Map<String, Object>) (List<ParseExceptionReport>) (actualPayload.getUnparseableEvents()).get("buildSegments");
actualPayload.get("unparseableEvents")).get("buildSegments");
List<String> expectedMessages = expectedParseExceptionReports List<String> expectedMessages = expectedParseExceptionReports
.stream().map(r -> r.getDetails().get(0)).collect(Collectors.toList()); .stream().map(r -> r.getDetails().get(0)).collect(Collectors.toList());

View File

@ -28,6 +28,7 @@ import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.partitions.HashedPartitionsSpec; import org.apache.druid.indexer.partitions.HashedPartitionsSpec;
import org.apache.druid.indexer.partitions.SingleDimensionPartitionsSpec; import org.apache.druid.indexer.partitions.SingleDimensionPartitionsSpec;
import org.apache.druid.indexing.common.LockGranularity; import org.apache.druid.indexing.common.LockGranularity;
import org.apache.druid.indexing.common.TaskReport;
import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.segment.incremental.RowIngestionMetersTotals; import org.apache.druid.segment.incremental.RowIngestionMetersTotals;
@ -43,7 +44,6 @@ import java.io.Writer;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.nio.file.Files; import java.nio.file.Files;
import java.util.Arrays; import java.util.Arrays;
import java.util.Map;
public class MultiPhaseParallelIndexingRowStatsTest extends AbstractMultiPhaseParallelIndexingTest public class MultiPhaseParallelIndexingRowStatsTest extends AbstractMultiPhaseParallelIndexingTest
{ {
@ -133,8 +133,8 @@ public class MultiPhaseParallelIndexingRowStatsTest extends AbstractMultiPhasePa
false false
); );
final RowIngestionMetersTotals expectedTotals = RowMeters.with().totalProcessed(200); final RowIngestionMetersTotals expectedTotals = RowMeters.with().bytes(5630).totalProcessed(200);
final Map<String, Object> expectedReports = final TaskReport.ReportMap expectedReports =
maxNumConcurrentSubTasks <= 1 maxNumConcurrentSubTasks <= 1
? buildExpectedTaskReportSequential( ? buildExpectedTaskReportSequential(
task.getId(), task.getId(),
@ -148,7 +148,7 @@ public class MultiPhaseParallelIndexingRowStatsTest extends AbstractMultiPhasePa
expectedTotals expectedTotals
); );
Map<String, Object> actualReports = runTaskAndGetReports(task, TaskState.SUCCESS); TaskReport.ReportMap actualReports = runTaskAndGetReports(task, TaskState.SUCCESS);
compareTaskReports(expectedReports, actualReports); compareTaskReports(expectedReports, actualReports);
} }
@ -169,12 +169,12 @@ public class MultiPhaseParallelIndexingRowStatsTest extends AbstractMultiPhasePa
false, false,
false false
); );
Map<String, Object> expectedReports = buildExpectedTaskReportParallel( TaskReport.ReportMap expectedReports = buildExpectedTaskReportParallel(
task.getId(), task.getId(),
ImmutableList.of(), ImmutableList.of(),
new RowIngestionMetersTotals(200, 0, 0, 0, 0) new RowIngestionMetersTotals(200, 5630, 0, 0, 0)
); );
Map<String, Object> actualReports = runTaskAndGetReports(task, TaskState.SUCCESS); TaskReport.ReportMap actualReports = runTaskAndGetReports(task, TaskState.SUCCESS);
compareTaskReports(expectedReports, actualReports); compareTaskReports(expectedReports, actualReports);
} }
} }

View File

@ -21,7 +21,6 @@ package org.apache.druid.indexing.common.task.batch.parallel;
import com.google.common.base.Function; import com.google.common.base.Function;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.data.input.AbstractInputSource; import org.apache.druid.data.input.AbstractInputSource;
import org.apache.druid.data.input.InputFormat; import org.apache.druid.data.input.InputFormat;
import org.apache.druid.data.input.InputSplit; import org.apache.druid.data.input.InputSplit;
@ -35,6 +34,7 @@ import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexer.TaskStatusPlus; import org.apache.druid.indexer.TaskStatusPlus;
import org.apache.druid.indexer.partitions.DynamicPartitionsSpec; import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
import org.apache.druid.indexing.common.TaskReport;
import org.apache.druid.indexing.common.TaskToolbox; import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.task.AbstractTask; import org.apache.druid.indexing.common.task.AbstractTask;
import org.apache.druid.indexing.common.task.SegmentAllocators; import org.apache.druid.indexing.common.task.SegmentAllocators;
@ -713,7 +713,12 @@ public class ParallelIndexSupervisorTaskResourceTest extends AbstractParallelInd
); );
taskClient.report( taskClient.report(
new PushedSegmentsReport(getId(), Collections.emptySet(), Collections.singleton(segment), ImmutableMap.of()) new PushedSegmentsReport(
getId(),
Collections.emptySet(),
Collections.singleton(segment),
new TaskReport.ReportMap()
)
); );
return TaskStatus.fromCode(getId(), state); return TaskStatus.fromCode(getId(), state);
} }

View File

@ -20,6 +20,8 @@
package org.apache.druid.indexing.common.task.batch.parallel; package org.apache.druid.indexing.common.task.batch.parallel;
import nl.jqno.equalsverifier.EqualsVerifier; import nl.jqno.equalsverifier.EqualsVerifier;
import org.apache.druid.indexing.common.KillTaskReport;
import org.apache.druid.indexing.common.TaskReport;
import org.junit.Test; import org.junit.Test;
public class PushedSegmentsReportTest public class PushedSegmentsReportTest
@ -27,6 +29,13 @@ public class PushedSegmentsReportTest
@Test @Test
public void testEquals() public void testEquals()
{ {
EqualsVerifier.forClass(PushedSegmentsReport.class).usingGetClass().verify(); TaskReport.ReportMap map1 = new TaskReport.ReportMap();
TaskReport.ReportMap map2 = new TaskReport.ReportMap();
map2.put("killTaskReport", new KillTaskReport("taskId", new KillTaskReport.Stats(1, 2, 3)));
EqualsVerifier.forClass(PushedSegmentsReport.class)
.usingGetClass()
.withPrefabValues(TaskReport.ReportMap.class, map1, map2)
.verify();
} }
} }

View File

@ -29,6 +29,7 @@ import org.apache.druid.data.input.impl.JsonInputFormat;
import org.apache.druid.data.input.impl.LocalInputSource; import org.apache.druid.data.input.impl.LocalInputSource;
import org.apache.druid.indexer.TaskState; import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexing.common.LockGranularity; import org.apache.druid.indexing.common.LockGranularity;
import org.apache.druid.indexing.common.TaskReport;
import org.apache.druid.indexing.common.TaskToolbox; import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.actions.TaskActionClient; import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.indexing.common.task.Tasks; import org.apache.druid.indexing.common.task.Tasks;
@ -446,9 +447,8 @@ public class SinglePhaseParallelIndexingTest extends AbstractParallelIndexSuperv
false, false,
Collections.emptyList() Collections.emptyList()
); );
Map<String, Object> actualReports = task.doGetLiveReports(true); TaskReport.ReportMap actualReports = task.doGetLiveReports(true);
final long processedBytes = useInputFormatApi ? 335 : 0; TaskReport.ReportMap expectedReports = buildExpectedTaskReportParallel(
Map<String, Object> expectedReports = buildExpectedTaskReportParallel(
task.getId(), task.getId(),
ImmutableList.of( ImmutableList.of(
new ParseExceptionReport( new ParseExceptionReport(
@ -464,7 +464,7 @@ public class SinglePhaseParallelIndexingTest extends AbstractParallelIndexSuperv
1L 1L
) )
), ),
new RowIngestionMetersTotals(10, processedBytes, 1, 1, 1) new RowIngestionMetersTotals(10, 335, 1, 1, 1)
); );
compareTaskReports(expectedReports, actualReports); compareTaskReports(expectedReports, actualReports);
} }
@ -497,10 +497,9 @@ public class SinglePhaseParallelIndexingTest extends AbstractParallelIndexSuperv
TaskContainer taskContainer = getIndexingServiceClient().getTaskContainer(task.getId()); TaskContainer taskContainer = getIndexingServiceClient().getTaskContainer(task.getId());
final ParallelIndexSupervisorTask executedTask = (ParallelIndexSupervisorTask) taskContainer.getTask(); final ParallelIndexSupervisorTask executedTask = (ParallelIndexSupervisorTask) taskContainer.getTask();
Map<String, Object> actualReports = executedTask.doGetLiveReports(true); TaskReport.ReportMap actualReports = executedTask.doGetLiveReports(true);
final long processedBytes = useInputFormatApi ? 335 : 0; final RowIngestionMetersTotals expectedTotals = new RowIngestionMetersTotals(10, 335, 1, 1, 1);
RowIngestionMetersTotals expectedTotals = new RowIngestionMetersTotals(10, processedBytes, 1, 1, 1);
List<ParseExceptionReport> expectedUnparseableEvents = ImmutableList.of( List<ParseExceptionReport> expectedUnparseableEvents = ImmutableList.of(
new ParseExceptionReport( new ParseExceptionReport(
"{ts=2017unparseable}", "{ts=2017unparseable}",
@ -516,7 +515,7 @@ public class SinglePhaseParallelIndexingTest extends AbstractParallelIndexSuperv
) )
); );
Map<String, Object> expectedReports; TaskReport.ReportMap expectedReports;
if (useInputFormatApi) { if (useInputFormatApi) {
expectedReports = buildExpectedTaskReportSequential( expectedReports = buildExpectedTaskReportSequential(
task.getId(), task.getId(),

View File

@ -461,9 +461,9 @@ public abstract class SeekableStreamIndexTaskTestBase extends EasyMockSupport
protected IngestionStatsAndErrors getTaskReportData() throws IOException protected IngestionStatsAndErrors getTaskReportData() throws IOException
{ {
Map<String, TaskReport> taskReports = OBJECT_MAPPER.readValue( TaskReport.ReportMap taskReports = OBJECT_MAPPER.readValue(
reportsFile, reportsFile,
new TypeReference<Map<String, TaskReport>>() new TypeReference<TaskReport.ReportMap>()
{ {
} }
); );