diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/Controller.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/Controller.java index b7a93de5ed0..1fa23e3d1b7 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/Controller.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/Controller.java @@ -32,7 +32,6 @@ import org.apache.druid.msq.statistics.PartialKeyStatisticsInformation; import javax.annotation.Nullable; import java.util.List; -import java.util.Map; /** * Interface for the controller of a multi-stage query. @@ -123,6 +122,6 @@ public interface Controller List getTaskIds(); @Nullable - Map liveReports(); + TaskReport.ReportMap liveReports(); } diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerContext.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerContext.java index 0a32158cf9b..35aa1c79ef0 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerContext.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerContext.java @@ -28,8 +28,6 @@ import org.apache.druid.java.util.common.io.Closer; import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.apache.druid.server.DruidNode; -import java.util.Map; - /** * Context used by multi-stage query controllers. * @@ -80,5 +78,5 @@ public interface ControllerContext /** * Writes controller task report. */ - void writeReports(String controllerTaskId, Map reports); + void writeReports(String controllerTaskId, TaskReport.ReportMap reports); } diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java index aeba1be947c..6db998bfef2 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java @@ -916,7 +916,7 @@ public class ControllerImpl implements Controller @Override @Nullable - public Map liveReports() + public TaskReport.ReportMap liveReports() { final QueryDefinition queryDef = queryDefRef.get(); diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerContext.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerContext.java index 401d6af7072..1426726d592 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerContext.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerContext.java @@ -41,8 +41,6 @@ import org.apache.druid.rpc.indexing.OverlordClient; import org.apache.druid.segment.realtime.firehose.ChatHandler; import org.apache.druid.server.DruidNode; -import java.util.Map; - /** * Implementation for {@link ControllerContext} required to run multi-stage queries as indexing tasks. */ @@ -126,7 +124,7 @@ public class IndexerControllerContext implements ControllerContext } @Override - public void writeReports(String controllerTaskId, Map reports) + public void writeReports(String controllerTaskId, TaskReport.ReportMap reports) { toolbox.getTaskReportFileWriter().write(controllerTaskId, reports); } diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/client/ControllerChatHandler.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/client/ControllerChatHandler.java index 22d3b31cf10..2d979fb8bda 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/client/ControllerChatHandler.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/client/ControllerChatHandler.java @@ -45,7 +45,6 @@ import javax.ws.rs.core.Context; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; import java.util.List; -import java.util.Map; public class ControllerChatHandler implements ChatHandler { @@ -189,7 +188,7 @@ public class ControllerChatHandler implements ChatHandler public Response httpGetLiveReports(@Context final HttpServletRequest req) { ChatHandlers.authorizationCheck(req, Action.WRITE, task.getDataSource(), toolbox.getAuthorizerMapper()); - final Map reports = controller.liveReports(); + final TaskReport.ReportMap reports = controller.liveReports(); if (reports == null) { return Response.status(Response.Status.NOT_FOUND).build(); } diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/WorkerChatHandlerTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/WorkerChatHandlerTest.java index c88f22d1b74..70ed5651300 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/WorkerChatHandlerTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/WorkerChatHandlerTest.java @@ -53,7 +53,6 @@ import javax.servlet.http.HttpServletRequest; import javax.ws.rs.core.Response; import java.io.InputStream; import java.util.HashMap; -import java.util.Map; public class WorkerChatHandlerTest { @@ -88,7 +87,7 @@ public class WorkerChatHandlerTest new TaskReportFileWriter() { @Override - public void write(String taskId, Map reports) + public void write(String taskId, TaskReport.ReportMap reports) { } diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/client/ControllerChatHandlerTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/client/ControllerChatHandlerTest.java new file mode 100644 index 00000000000..81da98d6fa7 --- /dev/null +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/client/ControllerChatHandlerTest.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.msq.indexing.client; + +import org.apache.druid.indexing.common.KillTaskReport; +import org.apache.druid.indexing.common.TaskReport; +import org.apache.druid.indexing.common.TaskToolbox; +import org.apache.druid.msq.exec.Controller; +import org.apache.druid.msq.indexing.MSQControllerTask; +import org.apache.druid.server.security.AuthorizerMapper; +import org.junit.Assert; +import org.junit.Test; +import org.mockito.ArgumentMatchers; +import org.mockito.Mockito; + +import javax.servlet.http.HttpServletRequest; +import javax.ws.rs.core.Response; + +public class ControllerChatHandlerTest +{ + @Test + public void testHttpGetLiveReports() + { + final Controller controller = Mockito.mock(Controller.class); + + TaskReport.ReportMap reportMap = new TaskReport.ReportMap(); + reportMap.put("killUnusedSegments", new KillTaskReport("kill_1", new KillTaskReport.Stats(1, 2, 3))); + + Mockito.when(controller.liveReports()) + .thenReturn(reportMap); + + MSQControllerTask task = Mockito.mock(MSQControllerTask.class); + Mockito.when(task.getDataSource()) + .thenReturn("wiki"); + Mockito.when(controller.task()) + .thenReturn(task); + + TaskToolbox toolbox = Mockito.mock(TaskToolbox.class); + Mockito.when(toolbox.getAuthorizerMapper()) + .thenReturn(new AuthorizerMapper(null)); + + ControllerChatHandler chatHandler = new ControllerChatHandler(toolbox, controller); + + HttpServletRequest httpRequest = Mockito.mock(HttpServletRequest.class); + Mockito.when(httpRequest.getAttribute(ArgumentMatchers.anyString())) + .thenReturn("allow-all"); + Response response = chatHandler.httpGetLiveReports(httpRequest); + + Assert.assertEquals(200, response.getStatus()); + Assert.assertEquals(reportMap, response.getEntity()); + } +} diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/report/MSQTaskReportTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/report/MSQTaskReportTest.java index 4bc3d1075c1..df492a70056 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/report/MSQTaskReportTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/report/MSQTaskReportTest.java @@ -57,7 +57,6 @@ import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.List; -import java.util.Map; public class MSQTaskReportTest { @@ -242,9 +241,9 @@ public class MSQTaskReportTest writer.setObjectMapper(mapper); writer.write(TASK_ID, TaskReport.buildTaskReports(report)); - final Map reportMap = mapper.readValue( + final TaskReport.ReportMap reportMap = mapper.readValue( reportFile, - new TypeReference>() + new TypeReference() { } ); diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestControllerContext.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestControllerContext.java index 5ab8932de3e..e0b0a837de4 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestControllerContext.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestControllerContext.java @@ -88,7 +88,7 @@ public class MSQTestControllerContext implements ControllerContext private final ServiceEmitter emitter = new NoopServiceEmitter(); private Controller controller; - private Map report = null; + private TaskReport.ReportMap report = null; private final WorkerMemoryParameters workerMemoryParameters; public MSQTestControllerContext( @@ -273,14 +273,14 @@ public class MSQTestControllerContext implements ControllerContext } @Override - public void writeReports(String controllerTaskId, Map taskReport) + public void writeReports(String controllerTaskId, TaskReport.ReportMap taskReport) { if (controller != null && controller.id().equals(controllerTaskId)) { report = taskReport; } } - public Map getAllReports() + public TaskReport.ReportMap getAllReports() { return report; } diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestOverlordServiceClient.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestOverlordServiceClient.java index c5f601d875e..db878d70024 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestOverlordServiceClient.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestOverlordServiceClient.java @@ -55,7 +55,7 @@ public class MSQTestOverlordServiceClient extends NoopOverlordClient private final WorkerMemoryParameters workerMemoryParameters; private final List loadedSegmentMetadata; private final Map inMemoryControllers = new HashMap<>(); - private final Map> reports = new HashMap<>(); + private final Map reports = new HashMap<>(); private final Map inMemoryControllerTask = new HashMap<>(); private final Map inMemoryTaskStatus = new HashMap<>(); @@ -171,7 +171,7 @@ public class MSQTestOverlordServiceClient extends NoopOverlordClient // hooks to pull stuff out for testing @Nullable - public Map getReportForTask(String id) + public TaskReport.ReportMap getReportForTask(String id) { return reports.get(id); } diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestWorkerContext.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestWorkerContext.java index e80f8e2d31f..eb4976701eb 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestWorkerContext.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestWorkerContext.java @@ -126,7 +126,7 @@ public class MSQTestWorkerContext implements WorkerContext final TaskReportFileWriter reportFileWriter = new TaskReportFileWriter() { @Override - public void write(String taskId, Map reports) + public void write(String taskId, TaskReport.ReportMap reports) { } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/KillTaskReport.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/KillTaskReport.java index f97f761166a..f78b3d1d162 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/KillTaskReport.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/KillTaskReport.java @@ -62,6 +62,25 @@ public class KillTaskReport implements TaskReport return stats; } + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + KillTaskReport that = (KillTaskReport) o; + return Objects.equals(taskId, that.taskId) && Objects.equals(stats, that.stats); + } + + @Override + public int hashCode() + { + return Objects.hash(taskId, stats); + } + public static class Stats { private final int numSegmentsKilled; diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/MultipleFileTaskReportFileWriter.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/MultipleFileTaskReportFileWriter.java index 18313ccd79b..e6bc5e67f3a 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/MultipleFileTaskReportFileWriter.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/MultipleFileTaskReportFileWriter.java @@ -37,7 +37,7 @@ public class MultipleFileTaskReportFileWriter implements TaskReportFileWriter private ObjectMapper objectMapper; @Override - public void write(String taskId, Map reports) + public void write(String taskId, TaskReport.ReportMap reports) { final File reportsFile = taskReportFiles.get(taskId); if (reportsFile == null) { diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/SingleFileTaskReportFileWriter.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/SingleFileTaskReportFileWriter.java index 4d55dd64963..79f880d3076 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/SingleFileTaskReportFileWriter.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/SingleFileTaskReportFileWriter.java @@ -19,17 +19,13 @@ package org.apache.druid.indexing.common; -import com.fasterxml.jackson.core.JsonGenerator; import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.SerializerProvider; import org.apache.druid.java.util.common.FileUtils; -import org.apache.druid.java.util.common.jackson.JacksonUtils; import org.apache.druid.java.util.common.logger.Logger; import java.io.File; import java.io.FileOutputStream; import java.io.OutputStream; -import java.util.Map; public class SingleFileTaskReportFileWriter implements TaskReportFileWriter { @@ -44,7 +40,7 @@ public class SingleFileTaskReportFileWriter implements TaskReportFileWriter } @Override - public void write(String taskId, Map reports) + public void write(String taskId, TaskReport.ReportMap reports) { try { final File reportsFileParent = reportsFile.getParentFile(); @@ -70,20 +66,9 @@ public class SingleFileTaskReportFileWriter implements TaskReportFileWriter public static void writeReportToStream( final ObjectMapper objectMapper, final OutputStream outputStream, - final Map reports + final TaskReport.ReportMap reports ) throws Exception { - final SerializerProvider serializers = objectMapper.getSerializerProviderInstance(); - - try (final JsonGenerator jg = objectMapper.getFactory().createGenerator(outputStream)) { - jg.writeStartObject(); - - for (final Map.Entry entry : reports.entrySet()) { - jg.writeFieldName(entry.getKey()); - JacksonUtils.writeObjectUsingSerializerProvider(jg, serializers, entry.getValue()); - } - - jg.writeEndObject(); - } + objectMapper.writeValue(outputStream, reports); } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskReport.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskReport.java index 2ac260d72fd..86ed562112f 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskReport.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskReport.java @@ -21,9 +21,9 @@ package org.apache.druid.indexing.common; import com.fasterxml.jackson.annotation.JsonSubTypes; import com.fasterxml.jackson.annotation.JsonTypeInfo; +import com.google.common.base.Optional; import java.util.LinkedHashMap; -import java.util.Map; /** * TaskReport objects contain additional information about an indexing task, such as row statistics, errors, and @@ -31,7 +31,10 @@ import java.util.Map; */ @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type") @JsonSubTypes(value = { - @JsonSubTypes.Type(name = "ingestionStatsAndErrors", value = IngestionStatsAndErrorsTaskReport.class), + @JsonSubTypes.Type( + name = IngestionStatsAndErrorsTaskReport.REPORT_KEY, + value = IngestionStatsAndErrorsTaskReport.class + ), @JsonSubTypes.Type(name = KillTaskReport.REPORT_KEY, value = KillTaskReport.class) }) public interface TaskReport @@ -48,13 +51,29 @@ public interface TaskReport /** * Returns an order-preserving map that is suitable for passing into {@link TaskReportFileWriter#write}. */ - static Map buildTaskReports(TaskReport... taskReports) + static ReportMap buildTaskReports(TaskReport... taskReports) { - // Use LinkedHashMap to preserve order of the reports. - Map taskReportMap = new LinkedHashMap<>(); + ReportMap taskReportMap = new ReportMap(); for (TaskReport taskReport : taskReports) { taskReportMap.put(taskReport.getReportKey(), taskReport); } return taskReportMap; } + + /** + * Represents an ordered map from report key to a TaskReport that is compatible + * for writing out reports to files or serving over HTTP. + *

+ * This class is needed for Jackson serde to work correctly. Without this class, + * a TaskReport is serialized without the type information and cannot be + * deserialized back into a concrete implementation. + */ + class ReportMap extends LinkedHashMap + { + @SuppressWarnings("unchecked") + public Optional findReport(String reportKey) + { + return Optional.fromNullable((T) get(reportKey)); + } + } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskReportFileWriter.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskReportFileWriter.java index 908efe6e1a4..972f0b010f3 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskReportFileWriter.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskReportFileWriter.java @@ -21,11 +21,9 @@ package org.apache.druid.indexing.common; import com.fasterxml.jackson.databind.ObjectMapper; -import java.util.Map; - public interface TaskReportFileWriter { - void write(String taskId, Map reports); + void write(String taskId, TaskReport.ReportMap reports); void setObjectMapper(ObjectMapper objectMapper); } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java index a5acadf704e..4a093cde2ce 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java @@ -903,11 +903,35 @@ public abstract class AbstractBatchIndexTask extends AbstractTask return null; } + protected TaskReport.ReportMap buildLiveIngestionStatsReport( + IngestionState ingestionState, + Map unparseableEvents, + Map rowStats + ) + { + return TaskReport.buildTaskReports( + new IngestionStatsAndErrorsTaskReport( + getId(), + new IngestionStatsAndErrors( + ingestionState, + unparseableEvents, + rowStats, + null, + false, + 0L, + null, + null, + null + ) + ) + ); + } + /** * Builds a singleton map with {@link IngestionStatsAndErrorsTaskReport#REPORT_KEY} * as key and an {@link IngestionStatsAndErrorsTaskReport} for this task as value. */ - protected Map buildIngestionStatsReport( + protected TaskReport.ReportMap buildIngestionStatsReport( IngestionState ingestionState, String errorMessage, Long segmentsRead, diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java index c00f219c777..2e4710d43cd 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java @@ -609,7 +609,7 @@ public class AppenderatorDriverRealtimeIndexTask extends AbstractTask implements * * @return Map of reports for the task. */ - private Map getTaskCompletionReports() + private TaskReport.ReportMap getTaskCompletionReports() { return TaskReport.buildTaskReports( new IngestionStatsAndErrorsTaskReport( diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java index 59a0a499f91..eabb9e06236 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java @@ -502,7 +502,7 @@ public class CompactionTask extends AbstractBatchIndexTask log.info("Generated [%d] compaction task specs", totalNumSpecs); int failCnt = 0; - Map completionReports = new HashMap<>(); + final TaskReport.ReportMap completionReports = new TaskReport.ReportMap(); for (int i = 0; i < indexTaskSpecs.size(); i++) { ParallelIndexSupervisorTask eachSpec = indexTaskSpecs.get(i); final String json = toolbox.getJsonMapper().writerWithDefaultPrettyPrinter().writeValueAsString(eachSpec); @@ -521,9 +521,11 @@ public class CompactionTask extends AbstractBatchIndexTask } String reportKeySuffix = "_" + i; - Optional.ofNullable(eachSpec.getCompletionReports()) - .ifPresent(reports -> completionReports.putAll( - CollectionUtils.mapKeys(reports, key -> key + reportKeySuffix))); + Optional.ofNullable(eachSpec.getCompletionReports()).ifPresent( + reports -> completionReports.putAll( + CollectionUtils.mapKeys(reports, key -> key + reportKeySuffix) + ) + ); } else { failCnt++; log.warn("indexSpec is not ready: [%s].\nTrying the next indexSpec.", json); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java index 5fd3572f9e2..31a4b9c6665 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java @@ -680,7 +680,7 @@ public class HadoopIndexTask extends HadoopTask implements ChatHandler return Response.ok(returnMap).build(); } - private Map getTaskCompletionReports() + private TaskReport.ReportMap getTaskCompletionReports() { return buildIngestionStatsReport(ingestionState, errorMsg, null, null); } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java index 5c538d4a0fc..532529ecfc1 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java @@ -187,7 +187,7 @@ public class IndexTask extends AbstractBatchIndexTask implements ChatHandler @Nullable private String errorMsg; - private Map completionReports; + private TaskReport.ReportMap completionReports; @JsonCreator public IndexTask( @@ -320,7 +320,7 @@ public class IndexTask extends AbstractBatchIndexTask implements ChatHandler @Nullable @JsonIgnore - public Map getCompletionReports() + public TaskReport.ReportMap getCompletionReports() { return completionReports; } @@ -415,21 +415,13 @@ public class IndexTask extends AbstractBatchIndexTask implements ChatHandler ) { IndexTaskUtils.datasourceAuthorizationCheck(req, Action.READ, getDataSource(), authorizerMapper); - Map returnMap = new HashMap<>(); - Map ingestionStatsAndErrors = new HashMap<>(); - Map payload = new HashMap<>(); - Map events = getTaskCompletionUnparseableEvents(); - payload.put("ingestionState", ingestionState); - payload.put("unparseableEvents", events); - payload.put("rowStats", doGetRowStats(full != null)); - - ingestionStatsAndErrors.put("taskId", getId()); - ingestionStatsAndErrors.put("payload", payload); - ingestionStatsAndErrors.put("type", "ingestionStatsAndErrors"); - - returnMap.put("ingestionStatsAndErrors", ingestionStatsAndErrors); - return Response.ok(returnMap).build(); + final TaskReport.ReportMap liveReports = buildLiveIngestionStatsReport( + ingestionState, + getTaskCompletionUnparseableEvents(), + doGetRowStats(full != null) + ); + return Response.ok(liveReports).build(); } @JsonProperty("spec") diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/GeneratedPartitionsMetadataReport.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/GeneratedPartitionsMetadataReport.java index a83c3cc7412..7c98b3ee579 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/GeneratedPartitionsMetadataReport.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/GeneratedPartitionsMetadataReport.java @@ -24,7 +24,6 @@ import com.fasterxml.jackson.annotation.JsonProperty; import org.apache.druid.indexing.common.TaskReport; import java.util.List; -import java.util.Map; /** * Report containing the {@link GenericPartitionStat}s created by a {@link PartialSegmentGenerateTask}. This report is @@ -38,7 +37,7 @@ class GeneratedPartitionsMetadataReport extends GeneratedPartitionsReport GeneratedPartitionsMetadataReport( @JsonProperty("taskId") String taskId, @JsonProperty("partitionStats") List partitionStats, - @JsonProperty("taskReport") Map taskReport + @JsonProperty("taskReport") TaskReport.ReportMap taskReport ) { super(taskId, partitionStats, taskReport); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/GeneratedPartitionsReport.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/GeneratedPartitionsReport.java index 1fa025d1c91..8fedae5a4f1 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/GeneratedPartitionsReport.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/GeneratedPartitionsReport.java @@ -23,7 +23,6 @@ import com.fasterxml.jackson.annotation.JsonProperty; import org.apache.druid.indexing.common.TaskReport; import java.util.List; -import java.util.Map; import java.util.Objects; /** @@ -35,9 +34,9 @@ public class GeneratedPartitionsReport implements SubTaskReport { private final String taskId; private final List partitionStats; - private final Map taskReport; + private final TaskReport.ReportMap taskReport; - GeneratedPartitionsReport(String taskId, List partitionStats, Map taskReport) + GeneratedPartitionsReport(String taskId, List partitionStats, TaskReport.ReportMap taskReport) { this.taskId = taskId; this.partitionStats = partitionStats; @@ -52,7 +51,7 @@ public class GeneratedPartitionsReport implements SubTaskReport } @JsonProperty - public Map getTaskReport() + public TaskReport.ReportMap getTaskReport() { return taskReport; } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java index 4d5b9717ca9..27694360ed9 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java @@ -202,7 +202,7 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen private volatile Pair, Map> indexGenerateRowStats; private IngestionState ingestionState; - private Map completionReports; + private TaskReport.ReportMap completionReports; private Long segmentsRead; private Long segmentsPublished; private final boolean isCompactionTask; @@ -300,7 +300,7 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen @Nullable @JsonIgnore - public Map getCompletionReports() + public TaskReport.ReportMap getCompletionReports() { return completionReports; } @@ -1238,7 +1238,7 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen /** * Generate an IngestionStatsAndErrorsTaskReport for the task. */ - private Map getTaskCompletionReports(TaskStatus taskStatus) + private TaskReport.ReportMap getTaskCompletionReports(TaskStatus taskStatus) { return buildIngestionStatsReport( IngestionState.COMPLETED, @@ -1602,7 +1602,7 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen // Get stats from completed tasks Map completedSubtaskReports = parallelSinglePhaseRunner.getReports(); for (PushedSegmentsReport pushedSegmentsReport : completedSubtaskReports.values()) { - Map taskReport = pushedSegmentsReport.getTaskReport(); + TaskReport.ReportMap taskReport = pushedSegmentsReport.getTaskReport(); if (taskReport == null || taskReport.isEmpty()) { LOG.warn("Received an empty report from subtask[%s]" + pushedSegmentsReport.getTaskId()); continue; @@ -1642,7 +1642,7 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen final List unparseableEvents = new ArrayList<>(); long totalSegmentsRead = 0L; for (GeneratedPartitionsReport generatedPartitionsReport : completedSubtaskReports.values()) { - Map taskReport = generatedPartitionsReport.getTaskReport(); + TaskReport.ReportMap taskReport = generatedPartitionsReport.getTaskReport(); if (taskReport == null || taskReport.isEmpty()) { LOG.warn("Received an empty report from subtask[%s]", generatedPartitionsReport.getTaskId()); continue; @@ -1726,7 +1726,7 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen } private RowIngestionMetersTotals getBuildSegmentsStatsFromTaskReport( - Map taskReport, + TaskReport.ReportMap taskReport, List unparseableEvents ) { @@ -1804,12 +1804,8 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen } @VisibleForTesting - public Map doGetLiveReports(boolean isFullReport) + public TaskReport.ReportMap doGetLiveReports(boolean isFullReport) { - Map returnMap = new HashMap<>(); - Map ingestionStatsAndErrors = new HashMap<>(); - Map payload = new HashMap<>(); - Pair, Map> rowStatsAndUnparsebleEvents = doGetRowStatsAndUnparseableEvents(isFullReport, true); @@ -1824,16 +1820,11 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen : currentSequentialTask.getIngestionState(); } - payload.put("ingestionState", ingestionStateForReport); - payload.put("unparseableEvents", rowStatsAndUnparsebleEvents.rhs); - payload.put("rowStats", rowStatsAndUnparsebleEvents.lhs); - - ingestionStatsAndErrors.put("taskId", getId()); - ingestionStatsAndErrors.put("payload", payload); - ingestionStatsAndErrors.put("type", "ingestionStatsAndErrors"); - - returnMap.put("ingestionStatsAndErrors", ingestionStatsAndErrors); - return returnMap; + return buildLiveIngestionStatsReport( + ingestionStateForReport, + rowStatsAndUnparsebleEvents.rhs, + rowStatsAndUnparsebleEvents.lhs + ); } @GET diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialHashSegmentGenerateTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialHashSegmentGenerateTask.java index 49e3591ff18..b6d382472b3 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialHashSegmentGenerateTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialHashSegmentGenerateTask.java @@ -177,7 +177,7 @@ public class PartialHashSegmentGenerateTask extends PartialSegmentGenerateTask segments, Map taskReport) + GeneratedPartitionsMetadataReport createGeneratedPartitionsReport(TaskToolbox toolbox, List segments, TaskReport.ReportMap taskReport) { List partitionStats = segments.stream() .map(segment -> toolbox.getIntermediaryDataManager().generatePartitionStat(toolbox, segment)) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialRangeSegmentGenerateTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialRangeSegmentGenerateTask.java index 27604eb7e77..9f36ed63541 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialRangeSegmentGenerateTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialRangeSegmentGenerateTask.java @@ -192,7 +192,7 @@ public class PartialRangeSegmentGenerateTask extends PartialSegmentGenerateTask< } @Override - GeneratedPartitionsMetadataReport createGeneratedPartitionsReport(TaskToolbox toolbox, List segments, Map taskReport) + GeneratedPartitionsMetadataReport createGeneratedPartitionsReport(TaskToolbox toolbox, List segments, TaskReport.ReportMap taskReport) { List partitionStats = segments.stream() .map(segment -> toolbox.getIntermediaryDataManager().generatePartitionStat(toolbox, segment)) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentGenerateTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentGenerateTask.java index bbeb00aa845..0fd6b0916b8 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentGenerateTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentGenerateTask.java @@ -125,7 +125,7 @@ abstract class PartialSegmentGenerateTask e toolbox.getIndexingTmpDir() ); - Map taskReport = getTaskCompletionReports(getNumSegmentsRead(inputSource)); + TaskReport.ReportMap taskReport = getTaskCompletionReports(getNumSegmentsRead(inputSource)); taskClient.report(createGeneratedPartitionsReport(toolbox, segments, taskReport)); @@ -146,7 +146,7 @@ abstract class PartialSegmentGenerateTask e abstract T createGeneratedPartitionsReport( TaskToolbox toolbox, List segments, - Map taskReport + TaskReport.ReportMap taskReport ); private Long getNumSegmentsRead(InputSource inputSource) @@ -249,7 +249,7 @@ abstract class PartialSegmentGenerateTask e /** * Generate an IngestionStatsAndErrorsTaskReport for the task. */ - private Map getTaskCompletionReports(Long segmentsRead) + private TaskReport.ReportMap getTaskCompletionReports(Long segmentsRead) { return buildIngestionStatsReport( IngestionState.COMPLETED, diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentMergeTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentMergeTask.java index b3467bb6f20..d4c13ca3370 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentMergeTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentMergeTask.java @@ -22,12 +22,12 @@ package org.apache.druid.indexing.common.task.batch.parallel; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Preconditions; import com.google.common.base.Stopwatch; -import com.google.common.collect.ImmutableMap; import com.google.common.collect.Maps; import it.unimi.dsi.fastutil.ints.Int2ObjectMap; import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap; import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexing.common.TaskLock; +import org.apache.druid.indexing.common.TaskReport; import org.apache.druid.indexing.common.TaskToolbox; import org.apache.druid.indexing.common.actions.LockListAction; import org.apache.druid.indexing.common.actions.SurrogateAction; @@ -189,7 +189,9 @@ abstract class PartialSegmentMergeTask extends PerfectRollu intervalToUnzippedFiles ); - taskClient.report(new PushedSegmentsReport(getId(), Collections.emptySet(), pushedSegments, ImmutableMap.of())); + taskClient.report( + new PushedSegmentsReport(getId(), Collections.emptySet(), pushedSegments, new TaskReport.ReportMap()) + ); return TaskStatus.success(getId()); } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PushedSegmentsReport.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PushedSegmentsReport.java index 8ed373c7d95..9fad997ef78 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PushedSegmentsReport.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PushedSegmentsReport.java @@ -25,7 +25,6 @@ import com.google.common.base.Preconditions; import org.apache.druid.indexing.common.TaskReport; import org.apache.druid.timeline.DataSegment; -import java.util.Map; import java.util.Objects; import java.util.Set; @@ -41,14 +40,14 @@ public class PushedSegmentsReport implements SubTaskReport private final String taskId; private final Set oldSegments; private final Set newSegments; - private final Map taskReport; + private final TaskReport.ReportMap taskReport; @JsonCreator public PushedSegmentsReport( @JsonProperty("taskId") String taskId, @JsonProperty("oldSegments") Set oldSegments, @JsonProperty("segments") Set newSegments, - @JsonProperty("taskReport") Map taskReport + @JsonProperty("taskReport") TaskReport.ReportMap taskReport ) { this.taskId = Preconditions.checkNotNull(taskId, "taskId"); @@ -77,7 +76,7 @@ public class PushedSegmentsReport implements SubTaskReport } @JsonProperty("taskReport") - public Map getTaskReport() + public TaskReport.ReportMap getTaskReport() { return taskReport; } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java index 465750bad03..6af09461c40 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java @@ -282,7 +282,7 @@ public class SinglePhaseSubTask extends AbstractBatchSubtask implements ChatHand .transform(PartitionChunk::getObject) .toSet(); - Map taskReport = getTaskCompletionReports(); + TaskReport.ReportMap taskReport = getTaskCompletionReports(); taskClient.report(new PushedSegmentsReport(getId(), oldSegments, pushedSegments, taskReport)); toolbox.getTaskReportFileWriter().write(getId(), taskReport); @@ -542,23 +542,13 @@ public class SinglePhaseSubTask extends AbstractBatchSubtask implements ChatHand return Response.ok(doGetRowStats(full != null)).build(); } - private Map doGetLiveReports(boolean isFullReport) + private TaskReport.ReportMap doGetLiveReports(boolean isFullReport) { - Map returnMap = new HashMap<>(); - Map ingestionStatsAndErrors = new HashMap<>(); - Map payload = new HashMap<>(); - Map events = getTaskCompletionUnparseableEvents(); - - payload.put("ingestionState", ingestionState); - payload.put("unparseableEvents", events); - payload.put("rowStats", doGetRowStats(isFullReport)); - - ingestionStatsAndErrors.put("taskId", getId()); - ingestionStatsAndErrors.put("payload", payload); - ingestionStatsAndErrors.put("type", "ingestionStatsAndErrors"); - - returnMap.put("ingestionStatsAndErrors", ingestionStatsAndErrors); - return returnMap; + return buildLiveIngestionStatsReport( + ingestionState, + getTaskCompletionUnparseableEvents(), + doGetRowStats(isFullReport) + ); } @GET @@ -585,7 +575,7 @@ public class SinglePhaseSubTask extends AbstractBatchSubtask implements ChatHand /** * Generate an IngestionStatsAndErrorsTaskReport for the task. */ - private Map getTaskCompletionReports() + private TaskReport.ReportMap getTaskCompletionReports() { return buildIngestionStatsReport(IngestionState.COMPLETED, errorMsg, null, null); } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java index f3e1e4a06d2..e5a1958a1fa 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java @@ -1118,7 +1118,7 @@ public abstract class SeekableStreamIndexTaskRunner getTaskCompletionReports(@Nullable String errorMsg, long handoffWaitMs) + private TaskReport.ReportMap getTaskCompletionReports(@Nullable String errorMsg, long handoffWaitMs) { return TaskReport.buildTaskReports( new IngestionStatsAndErrorsTaskReport( diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java index e91d25c0b03..fe6c39b17ed 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java @@ -1678,9 +1678,9 @@ public class AppenderatorDriverRealtimeIndexTaskTest extends InitializedNullHand private IngestionStatsAndErrors getTaskReportData() throws IOException { - Map taskReports = OBJECT_MAPPER.readValue( + TaskReport.ReportMap taskReports = OBJECT_MAPPER.readValue( reportsFile, - new TypeReference>() + new TypeReference() { } ); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java index 2619a23ba33..4e5dadf4dbf 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java @@ -2548,9 +2548,9 @@ public class IndexTaskTest extends IngestionTestBase private IngestionStatsAndErrors getTaskReportData() throws IOException { - Map taskReports = jsonMapper.readValue( + TaskReport.ReportMap taskReports = jsonMapper.readValue( taskRunner.getTaskReportsFile(), - new TypeReference>() + new TypeReference() { } ); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java index 2fe8790fb9c..f0f7e329992 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java @@ -19,7 +19,6 @@ package org.apache.druid.indexing.common.task; -import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Optional; import com.google.common.util.concurrent.Futures; @@ -270,9 +269,6 @@ public abstract class IngestionTestBase extends InitializedNullHandlingTest /** * Converts ParseSpec to InputFormat for indexing tests. To be used until {@link FirehoseFactory} * & {@link InputRowParser} is deprecated and removed. - * - * @param parseSpec - * @return */ public static InputFormat createInputFormatFromParseSpec(ParseSpec parseSpec) { @@ -510,11 +506,9 @@ public abstract class IngestionTestBase extends InitializedNullHandlingTest } } - public Map getReports() throws IOException + public TaskReport.ReportMap getReports() throws IOException { - return objectMapper.readValue(reportsFile, new TypeReference>() - { - }); + return objectMapper.readValue(reportsFile, TaskReport.ReportMap.class); } public List getIngestionReports() throws IOException diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTaskTest.java index 20de427508f..488f8ce98dd 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTaskTest.java @@ -1134,7 +1134,7 @@ public class KillUnusedSegmentsTaskTest extends IngestionTestBase try { Object payload = getObjectMapper().readValue( taskRunner.getTaskReportsFile(), - new TypeReference>() + new TypeReference() { } ).get(KillTaskReport.REPORT_KEY).getPayload(); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/NoopTestTaskReportFileWriter.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/NoopTestTaskReportFileWriter.java index 0398a219516..dddbe1bd7c3 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/NoopTestTaskReportFileWriter.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/NoopTestTaskReportFileWriter.java @@ -23,12 +23,10 @@ import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.druid.indexing.common.TaskReport; import org.apache.druid.indexing.common.TaskReportFileWriter; -import java.util.Map; - public class NoopTestTaskReportFileWriter implements TaskReportFileWriter { @Override - public void write(String id, Map reports) + public void write(String id, TaskReport.ReportMap reports) { } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskReportSerdeTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskReportSerdeTest.java index aece6edb3f4..b0fced552b3 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskReportSerdeTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskReportSerdeTest.java @@ -28,6 +28,7 @@ import com.google.common.io.Files; import org.apache.druid.indexer.IngestionState; import org.apache.druid.indexing.common.IngestionStatsAndErrors; import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReport; +import org.apache.druid.indexing.common.KillTaskReport; import org.apache.druid.indexing.common.SingleFileTaskReportFileWriter; import org.apache.druid.indexing.common.TaskReport; import org.apache.druid.indexing.common.TestUtils; @@ -38,7 +39,7 @@ import org.junit.rules.TemporaryFolder; import java.io.File; import java.nio.charset.StandardCharsets; -import java.util.Map; +import java.util.Collections; public class TaskReportSerdeTest { @@ -55,47 +56,56 @@ public class TaskReportSerdeTest } @Test - public void testSerde() throws Exception + public void testSerdeOfIngestionReport() throws Exception { - IngestionStatsAndErrorsTaskReport report1 = new IngestionStatsAndErrorsTaskReport( - "testID", - new IngestionStatsAndErrors( - IngestionState.BUILD_SEGMENTS, - ImmutableMap.of( - "hello", "world" - ), - ImmutableMap.of( - "number", 1234 - ), - "an error message", - true, - 1000L, - ImmutableMap.of("PartitionA", 5000L), - 5L, - 10L - ) - ); - String report1serialized = jsonMapper.writeValueAsString(report1); - IngestionStatsAndErrorsTaskReport report2 = (IngestionStatsAndErrorsTaskReport) jsonMapper.readValue( - report1serialized, - TaskReport.class - ); - Assert.assertEquals(report1, report2); - Assert.assertEquals(report1.hashCode(), report2.hashCode()); + IngestionStatsAndErrorsTaskReport originalReport = buildTestIngestionReport(); + String reportJson = jsonMapper.writeValueAsString(originalReport); + TaskReport deserialized = jsonMapper.readValue(reportJson, TaskReport.class); + Assert.assertTrue(deserialized instanceof IngestionStatsAndErrorsTaskReport); + + IngestionStatsAndErrorsTaskReport deserializedReport = (IngestionStatsAndErrorsTaskReport) deserialized; + Assert.assertEquals(originalReport, deserializedReport); + } + + @Test + public void testSerdeOfKillTaskReport() throws Exception + { + KillTaskReport originalReport = new KillTaskReport("taskId", new KillTaskReport.Stats(1, 2, 3)); + String reportJson = jsonMapper.writeValueAsString(originalReport); + TaskReport deserialized = jsonMapper.readValue(reportJson, TaskReport.class); + + Assert.assertTrue(deserialized instanceof KillTaskReport); + + KillTaskReport deserializedReport = (KillTaskReport) deserialized; + Assert.assertEquals(originalReport, deserializedReport); + } + + @Test + public void testWriteReportMapToFileAndRead() throws Exception + { + IngestionStatsAndErrorsTaskReport report1 = buildTestIngestionReport(); final File reportFile = temporaryFolder.newFile(); final SingleFileTaskReportFileWriter writer = new SingleFileTaskReportFileWriter(reportFile); writer.setObjectMapper(jsonMapper); - Map reportMap1 = TaskReport.buildTaskReports(report1); + TaskReport.ReportMap reportMap1 = TaskReport.buildTaskReports(report1); writer.write("testID", reportMap1); - Map reportMap2 = jsonMapper.readValue( - reportFile, - new TypeReference>() {} - ); + TaskReport.ReportMap reportMap2 = jsonMapper.readValue(reportFile, TaskReport.ReportMap.class); Assert.assertEquals(reportMap1, reportMap2); } + @Test + public void testWriteReportMapToStringAndRead() throws Exception + { + IngestionStatsAndErrorsTaskReport ingestionReport = buildTestIngestionReport(); + TaskReport.ReportMap reportMap = TaskReport.buildTaskReports(ingestionReport); + String json = jsonMapper.writeValueAsString(reportMap); + + TaskReport.ReportMap deserializedReportMap = jsonMapper.readValue(json, TaskReport.ReportMap.class); + Assert.assertEquals(reportMap, deserializedReportMap); + } + @Test public void testSerializationOnMissingPartitionStats() throws Exception { @@ -150,7 +160,7 @@ public class TaskReportSerdeTest final File reportFile = temporaryFolder.newFile(); final SingleFileTaskReportFileWriter writer = new SingleFileTaskReportFileWriter(reportFile); writer.setObjectMapper(jsonMapper); - writer.write("theTask", ImmutableMap.of("report", new ExceptionalTaskReport())); + writer.write("theTask", TaskReport.buildTaskReports(new ExceptionalTaskReport())); // Read the file, ensure it's incomplete and not valid JSON. This allows callers to determine the report was // not complete when written. @@ -160,6 +170,24 @@ public class TaskReportSerdeTest ); } + private IngestionStatsAndErrorsTaskReport buildTestIngestionReport() + { + return new IngestionStatsAndErrorsTaskReport( + "testID", + new IngestionStatsAndErrors( + IngestionState.BUILD_SEGMENTS, + Collections.singletonMap("hello", "world"), + Collections.singletonMap("number", 1234), + "an error message", + true, + 1000L, + Collections.singletonMap("PartitionA", 5000L), + 5L, + 10L + ) + ); + } + /** * Task report that throws an exception while being serialized. */ diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractMultiPhaseParallelIndexingTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractMultiPhaseParallelIndexingTest.java index db971e1ed86..cb6aa39d5c6 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractMultiPhaseParallelIndexingTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractMultiPhaseParallelIndexingTest.java @@ -20,7 +20,6 @@ package org.apache.druid.indexing.common.task.batch.parallel; import com.google.common.base.Preconditions; -import org.apache.druid.common.guava.FutureUtils; import org.apache.druid.data.input.InputFormat; import org.apache.druid.data.input.impl.DimensionsSpec; import org.apache.druid.data.input.impl.LocalInputSource; @@ -31,6 +30,7 @@ import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexer.partitions.PartitionsSpec; import org.apache.druid.indexing.common.LockGranularity; import org.apache.druid.indexing.common.SegmentCacheManagerFactory; +import org.apache.druid.indexing.common.TaskReport; import org.apache.druid.indexing.common.task.Task; import org.apache.druid.indexing.common.task.Tasks; import org.apache.druid.indexing.input.DruidInputSource; @@ -66,7 +66,6 @@ import javax.annotation.Nullable; import java.io.File; import java.util.Collections; import java.util.List; -import java.util.Map; import java.util.Set; @SuppressWarnings("SameParameterValue") @@ -183,10 +182,10 @@ abstract class AbstractMultiPhaseParallelIndexingTest extends AbstractParallelIn return getIndexingServiceClient().getPublishedSegments(task); } - Map runTaskAndGetReports(Task task, TaskState expectedTaskStatus) + TaskReport.ReportMap runTaskAndGetReports(Task task, TaskState expectedTaskStatus) { runTaskAndVerifyStatus(task, expectedTaskStatus); - return FutureUtils.getUnchecked(getIndexingServiceClient().taskReportAsMap(task.getId()), true); + return getIndexingServiceClient().getLiveReportsForTask(task.getId()); } protected ParallelIndexSupervisorTask createTask( diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java index a6a9f6ccdd7..84fac9b82d8 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java @@ -49,10 +49,13 @@ import org.apache.druid.indexer.TaskLocation; import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexer.TaskStatusPlus; import org.apache.druid.indexer.partitions.PartitionsSpec; +import org.apache.druid.indexing.common.IngestionStatsAndErrors; +import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReport; import org.apache.druid.indexing.common.RetryPolicyConfig; import org.apache.druid.indexing.common.RetryPolicyFactory; import org.apache.druid.indexing.common.SegmentCacheManagerFactory; import org.apache.druid.indexing.common.SingleFileTaskReportFileWriter; +import org.apache.druid.indexing.common.TaskReport; import org.apache.druid.indexing.common.TaskToolbox; import org.apache.druid.indexing.common.TestUtils; import org.apache.druid.indexing.common.actions.TaskActionClient; @@ -118,7 +121,6 @@ import java.io.File; import java.io.IOException; import java.util.Arrays; import java.util.Collections; -import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -546,12 +548,17 @@ public class AbstractParallelIndexSupervisorTaskTest extends IngestionTestBase @Override public ListenableFuture> taskReportAsMap(String taskId) + { + return Futures.immediateFuture(null); + } + + public TaskReport.ReportMap getLiveReportsForTask(String taskId) { final Optional task = getTaskStorage().getTask(taskId); if (!task.isPresent()) { return null; } - return Futures.immediateFuture(((ParallelIndexSupervisorTask) task.get()).doGetLiveReports(true)); + return ((ParallelIndexSupervisorTask) task.get()).doGetLiveReports(true); } public TaskContainer getTaskContainer(String taskId) @@ -773,20 +780,16 @@ public class AbstractParallelIndexSupervisorTaskTest extends IngestionTestBase } } - protected Map buildExpectedTaskReportSequential( + protected TaskReport.ReportMap buildExpectedTaskReportSequential( String taskId, List expectedUnparseableEvents, RowIngestionMetersTotals expectedDeterminePartitions, RowIngestionMetersTotals expectedTotals ) { - final Map payload = new HashMap<>(); + final Map unparseableEvents = + ImmutableMap.of("determinePartitions", ImmutableList.of(), "buildSegments", expectedUnparseableEvents); - payload.put("ingestionState", IngestionState.COMPLETED); - payload.put( - "unparseableEvents", - ImmutableMap.of("determinePartitions", ImmutableList.of(), "buildSegments", expectedUnparseableEvents) - ); Map emptyAverageMinuteMap = ImmutableMap.of( "processed", 0.0, "processedBytes", 0.0, @@ -801,72 +804,90 @@ public class AbstractParallelIndexSupervisorTaskTest extends IngestionTestBase "15m", emptyAverageMinuteMap ); - payload.put( - "rowStats", - ImmutableMap.of( - "movingAverages", - ImmutableMap.of("determinePartitions", emptyAverages, "buildSegments", emptyAverages), - "totals", - ImmutableMap.of("determinePartitions", expectedDeterminePartitions, "buildSegments", expectedTotals) - ) + final Map rowStats = ImmutableMap.of( + "movingAverages", + ImmutableMap.of("determinePartitions", emptyAverages, "buildSegments", emptyAverages), + "totals", + ImmutableMap.of("determinePartitions", expectedDeterminePartitions, "buildSegments", expectedTotals) ); - final Map ingestionStatsAndErrors = new HashMap<>(); - ingestionStatsAndErrors.put("taskId", taskId); - ingestionStatsAndErrors.put("payload", payload); - ingestionStatsAndErrors.put("type", "ingestionStatsAndErrors"); - - return Collections.singletonMap("ingestionStatsAndErrors", ingestionStatsAndErrors); + return TaskReport.buildTaskReports( + new IngestionStatsAndErrorsTaskReport( + taskId, + new IngestionStatsAndErrors( + IngestionState.COMPLETED, + unparseableEvents, + rowStats, + null, + false, + 0L, + null, + null, + null + ) + ) + ); } - protected Map buildExpectedTaskReportParallel( + protected TaskReport.ReportMap buildExpectedTaskReportParallel( String taskId, List expectedUnparseableEvents, RowIngestionMetersTotals expectedTotals ) { - Map returnMap = new HashMap<>(); - Map ingestionStatsAndErrors = new HashMap<>(); - Map payload = new HashMap<>(); + Map unparseableEvents = ImmutableMap.of("buildSegments", expectedUnparseableEvents); + Map rowStats = ImmutableMap.of("totals", ImmutableMap.of("buildSegments", expectedTotals)); - payload.put("ingestionState", IngestionState.COMPLETED); - payload.put("unparseableEvents", ImmutableMap.of("buildSegments", expectedUnparseableEvents)); - payload.put("rowStats", ImmutableMap.of("totals", ImmutableMap.of("buildSegments", expectedTotals))); - - ingestionStatsAndErrors.put("taskId", taskId); - ingestionStatsAndErrors.put("payload", payload); - ingestionStatsAndErrors.put("type", "ingestionStatsAndErrors"); - - returnMap.put("ingestionStatsAndErrors", ingestionStatsAndErrors); - return returnMap; + return TaskReport.buildTaskReports( + new IngestionStatsAndErrorsTaskReport( + taskId, + new IngestionStatsAndErrors( + IngestionState.COMPLETED, + unparseableEvents, + rowStats, + null, + false, + 0L, + null, + null, + null + ) + ) + ); } protected void compareTaskReports( - Map expectedReports, - Map actualReports + TaskReport.ReportMap expectedReports, + TaskReport.ReportMap actualReports ) { - expectedReports = (Map) expectedReports.get("ingestionStatsAndErrors"); - actualReports = (Map) actualReports.get("ingestionStatsAndErrors"); + final Optional expectedReportOptional + = expectedReports.findReport("ingestionStatsAndErrors"); + final Optional actualReportOptional + = actualReports.findReport("ingestionStatsAndErrors"); - Assert.assertEquals(expectedReports.get("taskId"), actualReports.get("taskId")); - Assert.assertEquals(expectedReports.get("type"), actualReports.get("type")); + Assert.assertTrue(expectedReportOptional.isPresent()); + Assert.assertTrue(actualReportOptional.isPresent()); - Map expectedPayload = (Map) expectedReports.get("payload"); - Map actualPayload = (Map) actualReports.get("payload"); - Assert.assertEquals(expectedPayload.get("ingestionState"), actualPayload.get("ingestionState")); + final IngestionStatsAndErrorsTaskReport expectedReport = expectedReportOptional.get(); + final IngestionStatsAndErrorsTaskReport actualReport = actualReportOptional.get(); - Map expectedTotals = (Map) expectedPayload.get("totals"); - Map actualTotals = (Map) actualReports.get("totals"); + Assert.assertEquals(expectedReport.getTaskId(), actualReport.getTaskId()); + Assert.assertEquals(expectedReport.getReportKey(), actualReport.getReportKey()); + + final IngestionStatsAndErrors expectedPayload = expectedReport.getPayload(); + final IngestionStatsAndErrors actualPayload = actualReport.getPayload(); + Assert.assertEquals(expectedPayload.getIngestionState(), actualPayload.getIngestionState()); + + Map expectedTotals = expectedPayload.getRowStats(); + Map actualTotals = actualPayload.getRowStats(); Assert.assertEquals(expectedTotals, actualTotals); List expectedParseExceptionReports = - (List) ((Map) - expectedPayload.get("unparseableEvents")).get("buildSegments"); + (List) (expectedPayload.getUnparseableEvents()).get("buildSegments"); List actualParseExceptionReports = - (List) ((Map) - actualPayload.get("unparseableEvents")).get("buildSegments"); + (List) (actualPayload.getUnparseableEvents()).get("buildSegments"); List expectedMessages = expectedParseExceptionReports .stream().map(r -> r.getDetails().get(0)).collect(Collectors.toList()); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/MultiPhaseParallelIndexingRowStatsTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/MultiPhaseParallelIndexingRowStatsTest.java index 0cf7fdd147b..b020c360ba8 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/MultiPhaseParallelIndexingRowStatsTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/MultiPhaseParallelIndexingRowStatsTest.java @@ -28,6 +28,7 @@ import org.apache.druid.indexer.TaskState; import org.apache.druid.indexer.partitions.HashedPartitionsSpec; import org.apache.druid.indexer.partitions.SingleDimensionPartitionsSpec; import org.apache.druid.indexing.common.LockGranularity; +import org.apache.druid.indexing.common.TaskReport; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.segment.incremental.RowIngestionMetersTotals; @@ -43,7 +44,6 @@ import java.io.Writer; import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.util.Arrays; -import java.util.Map; public class MultiPhaseParallelIndexingRowStatsTest extends AbstractMultiPhaseParallelIndexingTest { @@ -133,8 +133,8 @@ public class MultiPhaseParallelIndexingRowStatsTest extends AbstractMultiPhasePa false ); - final RowIngestionMetersTotals expectedTotals = RowMeters.with().totalProcessed(200); - final Map expectedReports = + final RowIngestionMetersTotals expectedTotals = RowMeters.with().bytes(5630).totalProcessed(200); + final TaskReport.ReportMap expectedReports = maxNumConcurrentSubTasks <= 1 ? buildExpectedTaskReportSequential( task.getId(), @@ -148,7 +148,7 @@ public class MultiPhaseParallelIndexingRowStatsTest extends AbstractMultiPhasePa expectedTotals ); - Map actualReports = runTaskAndGetReports(task, TaskState.SUCCESS); + TaskReport.ReportMap actualReports = runTaskAndGetReports(task, TaskState.SUCCESS); compareTaskReports(expectedReports, actualReports); } @@ -169,12 +169,12 @@ public class MultiPhaseParallelIndexingRowStatsTest extends AbstractMultiPhasePa false, false ); - Map expectedReports = buildExpectedTaskReportParallel( + TaskReport.ReportMap expectedReports = buildExpectedTaskReportParallel( task.getId(), ImmutableList.of(), - new RowIngestionMetersTotals(200, 0, 0, 0, 0) + new RowIngestionMetersTotals(200, 5630, 0, 0, 0) ); - Map actualReports = runTaskAndGetReports(task, TaskState.SUCCESS); + TaskReport.ReportMap actualReports = runTaskAndGetReports(task, TaskState.SUCCESS); compareTaskReports(expectedReports, actualReports); } } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskResourceTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskResourceTest.java index 266604a8542..6a4e5f90c64 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskResourceTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskResourceTest.java @@ -21,7 +21,6 @@ package org.apache.druid.indexing.common.task.batch.parallel; import com.google.common.base.Function; import com.google.common.base.Preconditions; -import com.google.common.collect.ImmutableMap; import org.apache.druid.data.input.AbstractInputSource; import org.apache.druid.data.input.InputFormat; import org.apache.druid.data.input.InputSplit; @@ -35,6 +34,7 @@ import org.apache.druid.indexer.TaskState; import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexer.TaskStatusPlus; import org.apache.druid.indexer.partitions.DynamicPartitionsSpec; +import org.apache.druid.indexing.common.TaskReport; import org.apache.druid.indexing.common.TaskToolbox; import org.apache.druid.indexing.common.task.AbstractTask; import org.apache.druid.indexing.common.task.SegmentAllocators; @@ -713,7 +713,12 @@ public class ParallelIndexSupervisorTaskResourceTest extends AbstractParallelInd ); taskClient.report( - new PushedSegmentsReport(getId(), Collections.emptySet(), Collections.singleton(segment), ImmutableMap.of()) + new PushedSegmentsReport( + getId(), + Collections.emptySet(), + Collections.singleton(segment), + new TaskReport.ReportMap() + ) ); return TaskStatus.fromCode(getId(), state); } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PushedSegmentsReportTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PushedSegmentsReportTest.java index 05d0a1fe9de..3edce650688 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PushedSegmentsReportTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PushedSegmentsReportTest.java @@ -20,6 +20,8 @@ package org.apache.druid.indexing.common.task.batch.parallel; import nl.jqno.equalsverifier.EqualsVerifier; +import org.apache.druid.indexing.common.KillTaskReport; +import org.apache.druid.indexing.common.TaskReport; import org.junit.Test; public class PushedSegmentsReportTest @@ -27,6 +29,13 @@ public class PushedSegmentsReportTest @Test public void testEquals() { - EqualsVerifier.forClass(PushedSegmentsReport.class).usingGetClass().verify(); + TaskReport.ReportMap map1 = new TaskReport.ReportMap(); + TaskReport.ReportMap map2 = new TaskReport.ReportMap(); + map2.put("killTaskReport", new KillTaskReport("taskId", new KillTaskReport.Stats(1, 2, 3))); + + EqualsVerifier.forClass(PushedSegmentsReport.class) + .usingGetClass() + .withPrefabValues(TaskReport.ReportMap.class, map1, map2) + .verify(); } } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexingTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexingTest.java index 7ad9f3c9464..bc2e38f798e 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexingTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexingTest.java @@ -29,6 +29,7 @@ import org.apache.druid.data.input.impl.JsonInputFormat; import org.apache.druid.data.input.impl.LocalInputSource; import org.apache.druid.indexer.TaskState; import org.apache.druid.indexing.common.LockGranularity; +import org.apache.druid.indexing.common.TaskReport; import org.apache.druid.indexing.common.TaskToolbox; import org.apache.druid.indexing.common.actions.TaskActionClient; import org.apache.druid.indexing.common.task.Tasks; @@ -446,9 +447,8 @@ public class SinglePhaseParallelIndexingTest extends AbstractParallelIndexSuperv false, Collections.emptyList() ); - Map actualReports = task.doGetLiveReports(true); - final long processedBytes = useInputFormatApi ? 335 : 0; - Map expectedReports = buildExpectedTaskReportParallel( + TaskReport.ReportMap actualReports = task.doGetLiveReports(true); + TaskReport.ReportMap expectedReports = buildExpectedTaskReportParallel( task.getId(), ImmutableList.of( new ParseExceptionReport( @@ -464,7 +464,7 @@ public class SinglePhaseParallelIndexingTest extends AbstractParallelIndexSuperv 1L ) ), - new RowIngestionMetersTotals(10, processedBytes, 1, 1, 1) + new RowIngestionMetersTotals(10, 335, 1, 1, 1) ); compareTaskReports(expectedReports, actualReports); } @@ -497,10 +497,9 @@ public class SinglePhaseParallelIndexingTest extends AbstractParallelIndexSuperv TaskContainer taskContainer = getIndexingServiceClient().getTaskContainer(task.getId()); final ParallelIndexSupervisorTask executedTask = (ParallelIndexSupervisorTask) taskContainer.getTask(); - Map actualReports = executedTask.doGetLiveReports(true); + TaskReport.ReportMap actualReports = executedTask.doGetLiveReports(true); - final long processedBytes = useInputFormatApi ? 335 : 0; - RowIngestionMetersTotals expectedTotals = new RowIngestionMetersTotals(10, processedBytes, 1, 1, 1); + final RowIngestionMetersTotals expectedTotals = new RowIngestionMetersTotals(10, 335, 1, 1, 1); List expectedUnparseableEvents = ImmutableList.of( new ParseExceptionReport( "{ts=2017unparseable}", @@ -516,7 +515,7 @@ public class SinglePhaseParallelIndexingTest extends AbstractParallelIndexSuperv ) ); - Map expectedReports; + TaskReport.ReportMap expectedReports; if (useInputFormatApi) { expectedReports = buildExpectedTaskReportSequential( task.getId(), diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTestBase.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTestBase.java index fb454acafec..1f7bd929d8a 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTestBase.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTestBase.java @@ -461,9 +461,9 @@ public abstract class SeekableStreamIndexTaskTestBase extends EasyMockSupport protected IngestionStatsAndErrors getTaskReportData() throws IOException { - Map taskReports = OBJECT_MAPPER.readValue( + TaskReport.ReportMap taskReports = OBJECT_MAPPER.readValue( reportsFile, - new TypeReference>() + new TypeReference() { } );