From 81d7b6ebe19c3cbecf1b6e48a4b2a2e995962b97 Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Mon, 15 Apr 2024 08:00:59 +0530 Subject: [PATCH] Fix OverlordClient to read reports as a concrete `ReportMap` (#16226) Follow up to #16217 Changes: - Update `OverlordClient.getReportAsMap()` to return `TaskReport.ReportMap` - Move the following classes to `org.apache.druid.indexer.report` in the `druid-processing` module - `TaskReport` - `KillTaskReport` - `IngestionStatsAndErrorsTaskReport` - `TaskContextReport` - `TaskReportFileWriter` - `SingleFileTaskReportFileWriter` - `TaskReportSerdeTest` - Remove `MsqOverlordResourceTestClient` as it had only one method which is already present in `OverlordResourceTestClient` itself --- .../indexing/kafka/KafkaIndexTaskTest.java | 2 +- .../kinesis/KinesisIndexTaskTest.java | 2 +- .../org/apache/druid/msq/exec/Controller.java | 2 +- .../druid/msq/exec/ControllerContext.java | 2 +- .../apache/druid/msq/exec/ControllerImpl.java | 4 +- .../indexing/IndexerControllerContext.java | 2 +- .../client/ControllerChatHandler.java | 2 +- .../msq/indexing/report/MSQTaskReport.java | 2 +- .../sql/resources/SqlStatementResource.java | 16 +- .../msq/util/SqlStatementResourceHelper.java | 67 ++- .../msq/indexing/WorkerChatHandlerTest.java | 4 +- .../client/ControllerChatHandlerTest.java | 4 +- .../indexing/report/MSQTaskReportTest.java | 9 +- .../resources/SqlStatementResourceTest.java | 46 +- .../msq/test/MSQTestControllerContext.java | 2 +- .../test/MSQTestOverlordServiceClient.java | 21 +- .../druid/msq/test/MSQTestWorkerContext.java | 4 +- .../MultipleFileTaskReportFileWriter.java | 3 + .../druid/indexing/common/TaskToolbox.java | 1 + .../indexing/common/TaskToolboxFactory.java | 1 + .../common/task/AbstractBatchIndexTask.java | 8 +- .../AppenderatorDriverRealtimeIndexTask.java | 8 +- .../indexing/common/task/CompactionTask.java | 2 +- .../indexing/common/task/HadoopIndexTask.java | 2 +- .../druid/indexing/common/task/IndexTask.java | 2 +- .../common/task/KillUnusedSegmentsTask.java | 4 +- .../GeneratedPartitionsMetadataReport.java | 2 +- .../parallel/GeneratedPartitionsReport.java | 2 +- .../parallel/ParallelIndexSupervisorTask.java | 23 +- .../PartialHashSegmentGenerateTask.java | 2 +- .../PartialRangeSegmentGenerateTask.java | 2 +- .../parallel/PartialSegmentGenerateTask.java | 2 +- .../parallel/PartialSegmentMergeTask.java | 2 +- .../batch/parallel/PushedSegmentsReport.java | 2 +- .../batch/parallel/SinglePhaseSubTask.java | 4 +- .../overlord/ThreadingTaskRunner.java | 2 +- .../SeekableStreamIndexTaskRunner.java | 8 +- ...penderatorDriverRealtimeIndexTaskTest.java | 6 +- .../task/CompactionTaskParallelRunTest.java | 2 +- .../common/task/CompactionTaskRunTest.java | 4 +- .../indexing/common/task/IndexTaskTest.java | 4 +- .../common/task/IngestionTestBase.java | 8 +- .../task/KillUnusedSegmentsTaskTest.java | 9 +- .../task/NoopTestTaskReportFileWriter.java | 4 +- .../common/task/ParseExceptionReport.java | 2 +- .../common/task/TaskReportSerdeTest.java | 233 ---------- ...bstractMultiPhaseParallelIndexingTest.java | 5 +- ...stractParallelIndexSupervisorTaskTest.java | 92 ++-- ...ultiPhaseParallelIndexingRowStatsTest.java | 2 +- ...rallelIndexSupervisorTaskResourceTest.java | 2 +- .../ParallelIndexSupervisorTaskTest.java | 4 +- .../parallel/PushedSegmentsReportTest.java | 4 +- .../SinglePhaseParallelIndexingTest.java | 37 +- .../common/tasklogs/FileTaskLogsTest.java | 2 +- .../SingleTaskBackgroundRunnerTest.java | 2 +- .../overlord/TestTaskToolboxFactory.java | 2 +- .../SeekableStreamIndexTaskTestBase.java | 11 +- .../indexer/AbstractITBatchIndexTest.java | 8 +- .../druid/testsEx/msq/ITMultiStageQuery.java | 7 +- .../clients/OverlordResourceTestClient.java | 10 +- .../msq/MsqOverlordResourceTestClient.java | 79 ---- .../testing/utils/MsqTestQueryHelper.java | 15 +- .../indexer/AbstractITBatchIndexTest.java | 8 +- .../tests/indexer/ITCompactionTaskTest.java | 6 +- .../report}/IngestionStatsAndErrors.java | 2 +- .../IngestionStatsAndErrorsTaskReport.java | 2 +- .../druid/indexer/report}/KillTaskReport.java | 2 +- .../SingleFileTaskReportFileWriter.java | 2 +- .../indexer/report}/TaskContextReport.java | 2 +- .../druid/indexer/report}/TaskReport.java | 6 +- .../indexer/report}/TaskReportFileWriter.java | 2 +- .../indexer/report/TaskReportSerdeTest.java | 397 ++++++++++++++++++ .../druid/rpc/indexing/OverlordClient.java | 3 +- .../rpc/indexing/OverlordClientImpl.java | 5 +- .../client/indexing/NoopOverlordClient.java | 3 +- .../rpc/indexing/OverlordClientImplTest.java | 10 +- .../java/org/apache/druid/cli/CliIndexer.java | 2 +- .../java/org/apache/druid/cli/CliPeon.java | 4 +- 78 files changed, 714 insertions(+), 569 deletions(-) delete mode 100644 indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskReportSerdeTest.java delete mode 100644 integration-tests/src/main/java/org/apache/druid/testing/clients/msq/MsqOverlordResourceTestClient.java rename {indexing-service/src/main/java/org/apache/druid/indexing/common => processing/src/main/java/org/apache/druid/indexer/report}/IngestionStatsAndErrors.java (99%) rename {indexing-service/src/main/java/org/apache/druid/indexing/common => processing/src/main/java/org/apache/druid/indexer/report}/IngestionStatsAndErrorsTaskReport.java (98%) rename {indexing-service/src/main/java/org/apache/druid/indexing/common => processing/src/main/java/org/apache/druid/indexer/report}/KillTaskReport.java (98%) rename {indexing-service/src/main/java/org/apache/druid/indexing/common => processing/src/main/java/org/apache/druid/indexer/report}/SingleFileTaskReportFileWriter.java (98%) rename {indexing-service/src/main/java/org/apache/druid/indexing/common => processing/src/main/java/org/apache/druid/indexer/report}/TaskContextReport.java (98%) rename {indexing-service/src/main/java/org/apache/druid/indexing/common => processing/src/main/java/org/apache/druid/indexer/report}/TaskReport.java (95%) rename {indexing-service/src/main/java/org/apache/druid/indexing/common => processing/src/main/java/org/apache/druid/indexer/report}/TaskReportFileWriter.java (96%) create mode 100644 processing/src/test/java/org/apache/druid/indexer/report/TaskReportSerdeTest.java diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java index 85a9fdb6254..5ee288d2622 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java @@ -54,7 +54,7 @@ import org.apache.druid.data.input.kafkainput.KafkaInputFormat; import org.apache.druid.data.input.kafkainput.KafkaStringHeaderFormat; import org.apache.druid.indexer.TaskState; import org.apache.druid.indexer.TaskStatus; -import org.apache.druid.indexing.common.IngestionStatsAndErrors; +import org.apache.druid.indexer.report.IngestionStatsAndErrors; import org.apache.druid.indexing.common.LockGranularity; import org.apache.druid.indexing.common.TestUtils; import org.apache.druid.indexing.common.task.IndexTaskTest; diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java index ad8964d712c..c1adf018b21 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java @@ -43,7 +43,7 @@ import org.apache.druid.data.input.impl.LongDimensionSchema; import org.apache.druid.data.input.impl.StringDimensionSchema; import org.apache.druid.indexer.TaskState; import org.apache.druid.indexer.TaskStatus; -import org.apache.druid.indexing.common.IngestionStatsAndErrors; +import org.apache.druid.indexer.report.IngestionStatsAndErrors; import org.apache.druid.indexing.common.LockGranularity; import org.apache.druid.indexing.common.TaskToolbox; import org.apache.druid.indexing.common.TestUtils; diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/Controller.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/Controller.java index 1fa23e3d1b7..5e23a42b2fa 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/Controller.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/Controller.java @@ -22,7 +22,7 @@ package org.apache.druid.msq.exec; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import org.apache.druid.indexer.TaskStatus; -import org.apache.druid.indexing.common.TaskReport; +import org.apache.druid.indexer.report.TaskReport; import org.apache.druid.msq.counters.CounterSnapshots; import org.apache.druid.msq.counters.CounterSnapshotsTree; import org.apache.druid.msq.indexing.MSQControllerTask; diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerContext.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerContext.java index 35aa1c79ef0..0aa90688b91 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerContext.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerContext.java @@ -22,7 +22,7 @@ package org.apache.druid.msq.exec; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.inject.Injector; import org.apache.druid.client.coordinator.CoordinatorClient; -import org.apache.druid.indexing.common.TaskReport; +import org.apache.druid.indexer.report.TaskReport; import org.apache.druid.indexing.common.actions.TaskActionClient; import org.apache.druid.java.util.common.io.Closer; import org.apache.druid.java.util.emitter.service.ServiceEmitter; diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java index 63aaf7b4de8..918a4fd2969 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java @@ -68,11 +68,11 @@ import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexer.partitions.DimensionRangePartitionsSpec; import org.apache.druid.indexer.partitions.DynamicPartitionsSpec; import org.apache.druid.indexer.partitions.PartitionsSpec; +import org.apache.druid.indexer.report.TaskContextReport; +import org.apache.druid.indexer.report.TaskReport; import org.apache.druid.indexing.common.LockGranularity; -import org.apache.druid.indexing.common.TaskContextReport; import org.apache.druid.indexing.common.TaskLock; import org.apache.druid.indexing.common.TaskLockType; -import org.apache.druid.indexing.common.TaskReport; import org.apache.druid.indexing.common.actions.LockListAction; import org.apache.druid.indexing.common.actions.LockReleaseAction; import org.apache.druid.indexing.common.actions.MarkSegmentsAsUnusedAction; diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerContext.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerContext.java index 1426726d592..aeee05e7506 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerContext.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerContext.java @@ -24,7 +24,7 @@ import com.google.inject.Injector; import com.google.inject.Key; import org.apache.druid.client.coordinator.CoordinatorClient; import org.apache.druid.guice.annotations.Self; -import org.apache.druid.indexing.common.TaskReport; +import org.apache.druid.indexer.report.TaskReport; import org.apache.druid.indexing.common.TaskToolbox; import org.apache.druid.indexing.common.actions.TaskActionClient; import org.apache.druid.java.util.common.io.Closer; diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/client/ControllerChatHandler.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/client/ControllerChatHandler.java index 2d979fb8bda..4be026ac34c 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/client/ControllerChatHandler.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/client/ControllerChatHandler.java @@ -19,7 +19,7 @@ package org.apache.druid.msq.indexing.client; -import org.apache.druid.indexing.common.TaskReport; +import org.apache.druid.indexer.report.TaskReport; import org.apache.druid.indexing.common.TaskToolbox; import org.apache.druid.msq.counters.CounterSnapshots; import org.apache.druid.msq.counters.CounterSnapshotsTree; diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/report/MSQTaskReport.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/report/MSQTaskReport.java index 0fbe6fec587..969a690c193 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/report/MSQTaskReport.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/report/MSQTaskReport.java @@ -22,7 +22,7 @@ package org.apache.druid.msq.indexing.report; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonTypeName; -import org.apache.druid.indexing.common.TaskReport; +import org.apache.druid.indexer.report.TaskReport; @JsonTypeName(MSQTaskReport.REPORT_KEY) public class MSQTaskReport implements TaskReport diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/resources/SqlStatementResource.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/resources/SqlStatementResource.java index 91145985ee1..cc360a48ede 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/resources/SqlStatementResource.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/resources/SqlStatementResource.java @@ -512,12 +512,11 @@ public class SqlStatementResource ) { if (sqlStatementState == SqlStatementState.SUCCESS) { - Map payload = + MSQTaskReportPayload msqTaskReportPayload = SqlStatementResourceHelper.getPayload(contactOverlord( overlordClient.taskReportAsMap(queryId), queryId )); - MSQTaskReportPayload msqTaskReportPayload = jsonMapper.convertValue(payload, MSQTaskReportPayload.class); Optional> pageList = SqlStatementResourceHelper.populatePageList( msqTaskReportPayload, msqDestination @@ -607,7 +606,8 @@ public class SqlStatementResource taskResponse, statusPlus, sqlStatementState, - contactOverlord(overlordClient.taskReportAsMap(queryId), queryId) + contactOverlord(overlordClient.taskReportAsMap(queryId), queryId), + jsonMapper ); } else { Optional> signature = SqlStatementResourceHelper.getSignature(msqControllerTask); @@ -735,8 +735,9 @@ public class SqlStatementResource ); } - MSQTaskReportPayload msqTaskReportPayload = jsonMapper.convertValue(SqlStatementResourceHelper.getPayload( - contactOverlord(overlordClient.taskReportAsMap(queryId), queryId)), MSQTaskReportPayload.class); + MSQTaskReportPayload msqTaskReportPayload = SqlStatementResourceHelper.getPayload( + contactOverlord(overlordClient.taskReportAsMap(queryId), queryId) + ); if (msqTaskReportPayload.getResults().getResultYielder() == null) { results = Optional.empty(); @@ -746,8 +747,9 @@ public class SqlStatementResource } else if (msqControllerTask.getQuerySpec().getDestination() instanceof DurableStorageMSQDestination) { - MSQTaskReportPayload msqTaskReportPayload = jsonMapper.convertValue(SqlStatementResourceHelper.getPayload( - contactOverlord(overlordClient.taskReportAsMap(queryId), queryId)), MSQTaskReportPayload.class); + MSQTaskReportPayload msqTaskReportPayload = SqlStatementResourceHelper.getPayload( + contactOverlord(overlordClient.taskReportAsMap(queryId), queryId) + ); List pages = SqlStatementResourceHelper.populatePageList( diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/SqlStatementResourceHelper.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/SqlStatementResourceHelper.java index b60f81ecca6..f90959a5666 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/SqlStatementResourceHelper.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/SqlStatementResourceHelper.java @@ -32,9 +32,11 @@ import org.apache.druid.frame.processor.FrameProcessors; import org.apache.druid.indexer.TaskLocation; import org.apache.druid.indexer.TaskState; import org.apache.druid.indexer.TaskStatusPlus; +import org.apache.druid.indexer.report.TaskReport; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.guava.Sequence; import org.apache.druid.java.util.common.guava.Sequences; +import org.apache.druid.java.util.common.jackson.JacksonUtils; import org.apache.druid.msq.counters.ChannelCounters; import org.apache.druid.msq.counters.CounterSnapshots; import org.apache.druid.msq.counters.CounterSnapshotsTree; @@ -45,7 +47,10 @@ import org.apache.druid.msq.indexing.destination.DataSourceMSQDestination; import org.apache.druid.msq.indexing.destination.DurableStorageMSQDestination; import org.apache.druid.msq.indexing.destination.MSQDestination; import org.apache.druid.msq.indexing.destination.TaskReportMSQDestination; +import org.apache.druid.msq.indexing.error.MSQErrorReport; +import org.apache.druid.msq.indexing.error.MSQFault; import org.apache.druid.msq.indexing.report.MSQStagesReport; +import org.apache.druid.msq.indexing.report.MSQTaskReport; import org.apache.druid.msq.indexing.report.MSQTaskReportPayload; import org.apache.druid.msq.kernel.StageDefinition; import org.apache.druid.msq.sql.SqlStatementState; @@ -243,12 +248,13 @@ public class SqlStatementResourceHelper TaskStatusResponse taskResponse, TaskStatusPlus statusPlus, SqlStatementState sqlStatementState, - Map msqPayload + TaskReport.ReportMap msqPayload, + ObjectMapper jsonMapper ) { - Map exceptionDetails = getQueryExceptionDetails(getPayload(msqPayload)); - Map exception = getMap(exceptionDetails, "error"); - if (exceptionDetails == null || exception == null) { + final MSQErrorReport exceptionDetails = getQueryExceptionDetails(getPayload(msqPayload)); + final MSQFault fault = exceptionDetails == null ? null : exceptionDetails.getFault(); + if (exceptionDetails == null || fault == null) { return Optional.of(new SqlStatementResult( queryId, sqlStatementState, @@ -258,18 +264,15 @@ public class SqlStatementResourceHelper null, DruidException.forPersona(DruidException.Persona.DEVELOPER) .ofCategory(DruidException.Category.UNCATEGORIZED) - .build("%s", taskResponse.getStatus().getErrorMsg()).toErrorResponse() + .build("%s", taskResponse.getStatus().getErrorMsg()) + .toErrorResponse() )); } - final String errorMessage = String.valueOf(exception.getOrDefault("errorMessage", statusPlus.getErrorMsg())); - exception.remove("errorMessage"); - String errorCode = String.valueOf(exception.getOrDefault("errorCode", "unknown")); - exception.remove("errorCode"); - Map stringException = new HashMap<>(); - for (Map.Entry exceptionKeys : exception.entrySet()) { - stringException.put(exceptionKeys.getKey(), String.valueOf(exceptionKeys.getValue())); - } + final String errorMessage = fault.getErrorMessage() == null ? statusPlus.getErrorMsg() : fault.getErrorMessage(); + final String errorCode = fault.getErrorCode() == null ? "unknown" : fault.getErrorCode(); + + final Map exceptionContext = buildExceptionContext(fault, jsonMapper); return Optional.of(new SqlStatementResult( queryId, sqlStatementState, @@ -285,7 +288,7 @@ public class SqlStatementResourceHelper DruidException ex = bob.forPersona(DruidException.Persona.USER) .ofCategory(DruidException.Category.UNCATEGORIZED) .build(errorMessage); - ex.withContext(stringException); + ex.withContext(exceptionContext); return ex; } }).toErrorResponse() @@ -361,22 +364,42 @@ public class SqlStatementResourceHelper return null; } - public static Map getQueryExceptionDetails(Map payload) + @Nullable + private static MSQErrorReport getQueryExceptionDetails(MSQTaskReportPayload payload) { - return getMap(getMap(payload, "status"), "errorReport"); + return payload == null ? null : payload.getStatus().getErrorReport(); } - public static Map getMap(Map map, String key) + @Nullable + public static MSQTaskReportPayload getPayload(TaskReport.ReportMap reportMap) { - if (map == null) { + if (reportMap == null) { return null; } - return (Map) map.get(key); + + Optional report = reportMap.findReport("multiStageQuery"); + return report.map(MSQTaskReport::getPayload).orElse(null); } - public static Map getPayload(Map results) + private static Map buildExceptionContext(MSQFault fault, ObjectMapper mapper) { - Map msqReport = getMap(results, "multiStageQuery"); - return getMap(msqReport, "payload"); + try { + final Map msqFaultAsMap = new HashMap<>( + mapper.readValue( + mapper.writeValueAsBytes(fault), + JacksonUtils.TYPE_REFERENCE_MAP_STRING_OBJECT + ) + ); + msqFaultAsMap.remove("errorCode"); + msqFaultAsMap.remove("errorMessage"); + + final Map exceptionContext = new HashMap<>(); + msqFaultAsMap.forEach((key, value) -> exceptionContext.put(key, String.valueOf(value))); + + return exceptionContext; + } + catch (Exception e) { + throw DruidException.defensive("Could not read MSQFault[%s] as a map: [%s]", fault, e.getMessage()); + } } } diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/WorkerChatHandlerTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/WorkerChatHandlerTest.java index 70ed5651300..9fe32cc8c8c 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/WorkerChatHandlerTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/WorkerChatHandlerTest.java @@ -22,8 +22,8 @@ package org.apache.druid.msq.indexing; import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.druid.frame.key.ClusterByPartitions; import org.apache.druid.indexer.TaskStatus; -import org.apache.druid.indexing.common.TaskReport; -import org.apache.druid.indexing.common.TaskReportFileWriter; +import org.apache.druid.indexer.report.TaskReport; +import org.apache.druid.indexer.report.TaskReportFileWriter; import org.apache.druid.indexing.common.TaskToolbox; import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.java.util.common.ISE; diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/client/ControllerChatHandlerTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/client/ControllerChatHandlerTest.java index 81da98d6fa7..10a724f4b7e 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/client/ControllerChatHandlerTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/client/ControllerChatHandlerTest.java @@ -19,8 +19,8 @@ package org.apache.druid.msq.indexing.client; -import org.apache.druid.indexing.common.KillTaskReport; -import org.apache.druid.indexing.common.TaskReport; +import org.apache.druid.indexer.report.KillTaskReport; +import org.apache.druid.indexer.report.TaskReport; import org.apache.druid.indexing.common.TaskToolbox; import org.apache.druid.msq.exec.Controller; import org.apache.druid.msq.indexing.MSQControllerTask; diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/report/MSQTaskReportTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/report/MSQTaskReportTest.java index f5b6b29119f..158f65a0594 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/report/MSQTaskReportTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/report/MSQTaskReportTest.java @@ -19,7 +19,6 @@ package org.apache.druid.msq.indexing.report; -import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; @@ -28,8 +27,8 @@ import org.apache.druid.frame.key.ClusterBy; import org.apache.druid.frame.key.KeyColumn; import org.apache.druid.frame.key.KeyOrder; import org.apache.druid.indexer.TaskState; -import org.apache.druid.indexing.common.SingleFileTaskReportFileWriter; -import org.apache.druid.indexing.common.TaskReport; +import org.apache.druid.indexer.report.SingleFileTaskReportFileWriter; +import org.apache.druid.indexer.report.TaskReport; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.guava.Sequences; import org.apache.druid.java.util.common.guava.Yielder; @@ -243,9 +242,7 @@ public class MSQTaskReportTest final TaskReport.ReportMap reportMap = mapper.readValue( reportFile, - new TypeReference() - { - } + TaskReport.ReportMap.class ); final MSQTaskReport report2 = (MSQTaskReport) reportMap.get(MSQTaskReport.REPORT_KEY); diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/sql/resources/SqlStatementResourceTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/sql/resources/SqlStatementResourceTest.java index 257efa4c307..b80e59223f7 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/sql/resources/SqlStatementResourceTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/sql/resources/SqlStatementResourceTest.java @@ -19,8 +19,6 @@ package org.apache.druid.msq.sql.resources; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; @@ -33,7 +31,7 @@ import org.apache.druid.error.ErrorResponse; import org.apache.druid.indexer.TaskLocation; import org.apache.druid.indexer.TaskState; import org.apache.druid.indexer.TaskStatusPlus; -import org.apache.druid.indexing.common.TaskReport; +import org.apache.druid.indexer.report.TaskReport; import org.apache.druid.indexing.common.task.IndexTask; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.Intervals; @@ -45,7 +43,6 @@ import org.apache.druid.java.util.http.client.response.StringFullResponseHolder; import org.apache.druid.msq.counters.ChannelCounters; import org.apache.druid.msq.counters.CounterSnapshots; import org.apache.druid.msq.counters.CounterSnapshotsTree; -import org.apache.druid.msq.guice.MSQIndexingModule; import org.apache.druid.msq.indexing.MSQControllerTask; import org.apache.druid.msq.indexing.MSQSpec; import org.apache.druid.msq.indexing.MSQTuningConfig; @@ -105,28 +102,23 @@ import java.util.ArrayDeque; import java.util.Collections; import java.util.HashMap; import java.util.List; -import java.util.Map; +import java.util.function.Supplier; public class SqlStatementResourceTest extends MSQTestBase { public static final DateTime CREATED_TIME = DateTimes.of("2023-05-31T12:00Z"); private static final ObjectMapper JSON_MAPPER = TestHelper.makeJsonMapper(); + private static final String ACCEPTED_SELECT_MSQ_QUERY = "QUERY_ID_1"; private static final String RUNNING_SELECT_MSQ_QUERY = "QUERY_ID_2"; private static final String FINISHED_SELECT_MSQ_QUERY = "QUERY_ID_3"; - private static final String ERRORED_SELECT_MSQ_QUERY = "QUERY_ID_4"; - private static final String RUNNING_NON_MSQ_TASK = "QUERY_ID_5"; - private static final String FAILED_NON_MSQ_TASK = "QUERY_ID_6"; - private static final String FINISHED_NON_MSQ_TASK = "QUERY_ID_7"; - private static final String ACCEPTED_INSERT_MSQ_TASK = "QUERY_ID_8"; - private static final String RUNNING_INSERT_MSQ_QUERY = "QUERY_ID_9"; private static final String FINISHED_INSERT_MSQ_QUERY = "QUERY_ID_10"; private static final String ERRORED_INSERT_MSQ_QUERY = "QUERY_ID_11"; @@ -235,7 +227,7 @@ public class SqlStatementResourceTest extends MSQTestBase new Object[]{234, "foo1", "bar1"} ); - private final MSQTaskReport selectTaskReport = new MSQTaskReport( + private final Supplier selectTaskReport = () -> new MSQTaskReport( FINISHED_SELECT_MSQ_QUERY, new MSQTaskReportPayload( new MSQStatusReport( @@ -350,10 +342,10 @@ public class SqlStatementResourceTest extends MSQTestBase private static final String FAILURE_MSG = "failure msg"; private static SqlStatementResource resource; - private static String SUPERUSER = "superuser"; - private static String STATE_R_USER = "stateR"; - private static String STATE_W_USER = "stateW"; - private static String STATE_RW_USER = "stateRW"; + private static final String SUPERUSER = "superuser"; + private static final String STATE_R_USER = "stateR"; + private static final String STATE_W_USER = "stateW"; + private static final String STATE_RW_USER = "stateRW"; private AuthorizerMapper authorizerMapper = new AuthorizerMapper(null) { @@ -394,12 +386,8 @@ public class SqlStatementResourceTest extends MSQTestBase @Mock private OverlordClient overlordClient; - final ObjectMapper mapper = TestHelper.makeJsonMapper() - .registerModules(new MSQIndexingModule().getJacksonModules()); - - private void setupMocks(OverlordClient indexingServiceClient) throws JsonProcessingException + private void setupMocks(OverlordClient indexingServiceClient) { - Mockito.when(indexingServiceClient.taskStatus(ArgumentMatchers.eq(ACCEPTED_SELECT_MSQ_QUERY))) .thenReturn(Futures.immediateFuture(new TaskStatusResponse(ACCEPTED_SELECT_MSQ_QUERY, new TaskStatusPlus( ACCEPTED_SELECT_MSQ_QUERY, @@ -466,13 +454,7 @@ public class SqlStatementResourceTest extends MSQTestBase Mockito.when(indexingServiceClient.taskReportAsMap(FINISHED_SELECT_MSQ_QUERY)) - .thenReturn(Futures.immediateFuture(mapper.readValue( - mapper.writeValueAsString(TaskReport.buildTaskReports(selectTaskReport)), - new TypeReference>() - { - } - ))); - + .thenAnswer(inv -> Futures.immediateFuture(TaskReport.buildTaskReports(selectTaskReport.get()))); Mockito.when(indexingServiceClient.taskStatus(ArgumentMatchers.eq(ERRORED_SELECT_MSQ_QUERY))) .thenReturn(Futures.immediateFuture(new TaskStatusResponse(ERRORED_SELECT_MSQ_QUERY, new TaskStatusPlus( @@ -605,13 +587,7 @@ public class SqlStatementResourceTest extends MSQTestBase )))); Mockito.when(indexingServiceClient.taskReportAsMap(ArgumentMatchers.eq(FINISHED_INSERT_MSQ_QUERY))) - .thenReturn(Futures.immediateFuture(mapper.readValue( - mapper.writeValueAsString(TaskReport.buildTaskReports( - MSQ_INSERT_TASK_REPORT)), - new TypeReference>() - { - } - ))); + .thenReturn(Futures.immediateFuture(TaskReport.buildTaskReports(MSQ_INSERT_TASK_REPORT))); Mockito.when(indexingServiceClient.taskPayload(FINISHED_INSERT_MSQ_QUERY)) .thenReturn(Futures.immediateFuture(new TaskPayloadResponse( diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestControllerContext.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestControllerContext.java index e0b0a837de4..45de3d7c4f5 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestControllerContext.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestControllerContext.java @@ -31,7 +31,7 @@ import org.apache.druid.client.coordinator.CoordinatorClient; import org.apache.druid.indexer.TaskLocation; import org.apache.druid.indexer.TaskState; import org.apache.druid.indexer.TaskStatus; -import org.apache.druid.indexing.common.TaskReport; +import org.apache.druid.indexer.report.TaskReport; import org.apache.druid.indexing.common.actions.TaskActionClient; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.concurrent.Execs; diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestOverlordServiceClient.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestOverlordServiceClient.java index db878d70024..4a5ac7e84e6 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestOverlordServiceClient.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestOverlordServiceClient.java @@ -19,7 +19,6 @@ package org.apache.druid.msq.test; -import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; @@ -31,7 +30,7 @@ import org.apache.druid.client.indexing.TaskPayloadResponse; import org.apache.druid.client.indexing.TaskStatusResponse; import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexer.TaskStatusPlus; -import org.apache.druid.indexing.common.TaskReport; +import org.apache.druid.indexer.report.TaskReport; import org.apache.druid.indexing.common.actions.TaskActionClient; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.ISE; @@ -42,7 +41,6 @@ import org.apache.druid.msq.indexing.MSQControllerTask; import org.joda.time.DateTime; import javax.annotation.Nullable; -import java.io.IOException; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -121,22 +119,9 @@ public class MSQTestOverlordServiceClient extends NoopOverlordClient } @Override - public ListenableFuture> taskReportAsMap(String taskId) + public ListenableFuture taskReportAsMap(String taskId) { - SettableFuture> future = SettableFuture.create(); - try { - future.set( - objectMapper.readValue( - objectMapper.writeValueAsBytes(getReportForTask(taskId)), - new TypeReference>() - { - } - )); - } - catch (IOException e) { - throw new RuntimeException(e); - } - return future; + return Futures.immediateFuture(getReportForTask(taskId)); } @Override diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestWorkerContext.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestWorkerContext.java index eb4976701eb..d2283a94be0 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestWorkerContext.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestWorkerContext.java @@ -22,8 +22,8 @@ package org.apache.druid.msq.test; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.inject.Injector; import org.apache.druid.frame.processor.Bouncer; -import org.apache.druid.indexing.common.TaskReport; -import org.apache.druid.indexing.common.TaskReportFileWriter; +import org.apache.druid.indexer.report.TaskReport; +import org.apache.druid.indexer.report.TaskReportFileWriter; import org.apache.druid.indexing.common.TaskToolbox; import org.apache.druid.java.util.common.FileUtils; import org.apache.druid.java.util.common.io.Closer; diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/MultipleFileTaskReportFileWriter.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/MultipleFileTaskReportFileWriter.java index e6bc5e67f3a..28cc1ae2af5 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/MultipleFileTaskReportFileWriter.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/MultipleFileTaskReportFileWriter.java @@ -20,6 +20,9 @@ package org.apache.druid.indexing.common; import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.druid.indexer.report.SingleFileTaskReportFileWriter; +import org.apache.druid.indexer.report.TaskReport; +import org.apache.druid.indexer.report.TaskReportFileWriter; import org.apache.druid.java.util.common.FileUtils; import org.apache.druid.java.util.common.logger.Logger; diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolbox.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolbox.java index e828e5e3f07..62d649894f8 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolbox.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolbox.java @@ -32,6 +32,7 @@ import org.apache.druid.client.coordinator.CoordinatorClient; import org.apache.druid.discovery.DataNodeService; import org.apache.druid.discovery.DruidNodeAnnouncer; import org.apache.druid.discovery.LookupNodeService; +import org.apache.druid.indexer.report.TaskReportFileWriter; import org.apache.druid.indexing.common.actions.SegmentTransactionalInsertAction; import org.apache.druid.indexing.common.actions.TaskActionClient; import org.apache.druid.indexing.common.config.TaskConfig; diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolboxFactory.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolboxFactory.java index 6f009d0b889..669a30dc5de 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolboxFactory.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolboxFactory.java @@ -34,6 +34,7 @@ import org.apache.druid.guice.annotations.AttemptId; import org.apache.druid.guice.annotations.Json; import org.apache.druid.guice.annotations.Parent; import org.apache.druid.guice.annotations.RemoteChatHandler; +import org.apache.druid.indexer.report.TaskReportFileWriter; import org.apache.druid.indexing.common.actions.TaskActionClientFactory; import org.apache.druid.indexing.common.config.TaskConfig; import org.apache.druid.indexing.common.task.Task; diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java index 4486f837376..bfe110a33e0 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java @@ -31,13 +31,13 @@ import org.apache.druid.data.input.InputSource; import org.apache.druid.data.input.InputSourceReader; import org.apache.druid.data.input.impl.DimensionsSpec; import org.apache.druid.indexer.IngestionState; -import org.apache.druid.indexing.common.IngestionStatsAndErrors; -import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReport; +import org.apache.druid.indexer.report.IngestionStatsAndErrors; +import org.apache.druid.indexer.report.IngestionStatsAndErrorsTaskReport; +import org.apache.druid.indexer.report.TaskContextReport; +import org.apache.druid.indexer.report.TaskReport; import org.apache.druid.indexing.common.LockGranularity; -import org.apache.druid.indexing.common.TaskContextReport; import org.apache.druid.indexing.common.TaskLock; import org.apache.druid.indexing.common.TaskLockType; -import org.apache.druid.indexing.common.TaskReport; import org.apache.druid.indexing.common.TaskToolbox; import org.apache.druid.indexing.common.actions.LockListAction; import org.apache.druid.indexing.common.actions.RetrieveUsedSegmentsAction; diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java index 19f8a775b27..1c581cd6d9c 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java @@ -40,16 +40,16 @@ import org.apache.druid.discovery.NodeRole; import org.apache.druid.indexer.IngestionState; import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexer.partitions.DynamicPartitionsSpec; +import org.apache.druid.indexer.report.IngestionStatsAndErrors; +import org.apache.druid.indexer.report.IngestionStatsAndErrorsTaskReport; +import org.apache.druid.indexer.report.TaskContextReport; +import org.apache.druid.indexer.report.TaskReport; import org.apache.druid.indexing.appenderator.ActionBasedSegmentAllocator; import org.apache.druid.indexing.appenderator.ActionBasedUsedSegmentChecker; -import org.apache.druid.indexing.common.IngestionStatsAndErrors; -import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReport; import org.apache.druid.indexing.common.LockGranularity; -import org.apache.druid.indexing.common.TaskContextReport; import org.apache.druid.indexing.common.TaskLock; import org.apache.druid.indexing.common.TaskLockType; import org.apache.druid.indexing.common.TaskRealtimeMetricsMonitorBuilder; -import org.apache.druid.indexing.common.TaskReport; import org.apache.druid.indexing.common.TaskToolbox; import org.apache.druid.indexing.common.actions.SegmentAllocateAction; import org.apache.druid.indexing.common.actions.SegmentLockAcquireAction; diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java index 102019b94b0..e5df64c5e79 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java @@ -48,10 +48,10 @@ import org.apache.druid.indexer.Property; import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexer.partitions.DynamicPartitionsSpec; import org.apache.druid.indexer.partitions.PartitionsSpec; +import org.apache.druid.indexer.report.TaskReport; import org.apache.druid.indexing.common.LockGranularity; import org.apache.druid.indexing.common.RetryPolicyFactory; import org.apache.druid.indexing.common.SegmentCacheManagerFactory; -import org.apache.druid.indexing.common.TaskReport; import org.apache.druid.indexing.common.TaskToolbox; import org.apache.druid.indexing.common.actions.RetrieveUsedSegmentsAction; import org.apache.druid.indexing.common.actions.TaskActionClient; diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java index 31a4b9c6665..21b17830668 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java @@ -43,9 +43,9 @@ import org.apache.druid.indexer.MetadataStorageUpdaterJobHandler; import org.apache.druid.indexer.TaskMetricsGetter; import org.apache.druid.indexer.TaskMetricsUtils; import org.apache.druid.indexer.TaskStatus; +import org.apache.druid.indexer.report.TaskReport; import org.apache.druid.indexing.common.TaskLock; import org.apache.druid.indexing.common.TaskLockType; -import org.apache.druid.indexing.common.TaskReport; import org.apache.druid.indexing.common.TaskToolbox; import org.apache.druid.indexing.common.actions.TaskActionClient; import org.apache.druid.indexing.common.actions.TimeChunkLockAcquireAction; diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java index c2160686ab2..329bc0d8718 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java @@ -48,9 +48,9 @@ import org.apache.druid.indexer.partitions.DynamicPartitionsSpec; import org.apache.druid.indexer.partitions.HashedPartitionsSpec; import org.apache.druid.indexer.partitions.PartitionsSpec; import org.apache.druid.indexer.partitions.SecondaryPartitionType; +import org.apache.druid.indexer.report.TaskReport; import org.apache.druid.indexing.common.TaskLockType; import org.apache.druid.indexing.common.TaskRealtimeMetricsMonitorBuilder; -import org.apache.druid.indexing.common.TaskReport; import org.apache.druid.indexing.common.TaskToolbox; import org.apache.druid.indexing.common.actions.TaskActionClient; import org.apache.druid.indexing.common.stats.TaskRealtimeMetricsMonitor; diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTask.java index 8c61a6b2184..acd490a6d21 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTask.java @@ -29,9 +29,9 @@ import com.google.common.collect.ImmutableSet; import org.apache.druid.client.indexing.ClientKillUnusedSegmentsTaskQuery; import org.apache.druid.error.InvalidInput; import org.apache.druid.indexer.TaskStatus; -import org.apache.druid.indexing.common.KillTaskReport; +import org.apache.druid.indexer.report.KillTaskReport; +import org.apache.druid.indexer.report.TaskReport; import org.apache.druid.indexing.common.TaskLock; -import org.apache.druid.indexing.common.TaskReport; import org.apache.druid.indexing.common.TaskToolbox; import org.apache.druid.indexing.common.actions.MarkSegmentsAsUnusedAction; import org.apache.druid.indexing.common.actions.RetrieveUnusedSegmentsAction; diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/GeneratedPartitionsMetadataReport.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/GeneratedPartitionsMetadataReport.java index 7c98b3ee579..2822d163d14 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/GeneratedPartitionsMetadataReport.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/GeneratedPartitionsMetadataReport.java @@ -21,7 +21,7 @@ package org.apache.druid.indexing.common.task.batch.parallel; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; -import org.apache.druid.indexing.common.TaskReport; +import org.apache.druid.indexer.report.TaskReport; import java.util.List; diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/GeneratedPartitionsReport.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/GeneratedPartitionsReport.java index 8fedae5a4f1..0934d5154a9 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/GeneratedPartitionsReport.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/GeneratedPartitionsReport.java @@ -20,7 +20,7 @@ package org.apache.druid.indexing.common.task.batch.parallel; import com.fasterxml.jackson.annotation.JsonProperty; -import org.apache.druid.indexing.common.TaskReport; +import org.apache.druid.indexer.report.TaskReport; import java.util.List; import java.util.Objects; diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java index 826bfd243bc..5d72f4a177c 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java @@ -40,10 +40,10 @@ import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexer.partitions.DimensionRangePartitionsSpec; import org.apache.druid.indexer.partitions.HashedPartitionsSpec; import org.apache.druid.indexer.partitions.PartitionsSpec; -import org.apache.druid.indexing.common.IngestionStatsAndErrors; -import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReport; +import org.apache.druid.indexer.report.IngestionStatsAndErrors; +import org.apache.druid.indexer.report.IngestionStatsAndErrorsTaskReport; +import org.apache.druid.indexer.report.TaskReport; import org.apache.druid.indexing.common.TaskLockType; -import org.apache.druid.indexing.common.TaskReport; import org.apache.druid.indexing.common.TaskToolbox; import org.apache.druid.indexing.common.actions.TaskActionClient; import org.apache.druid.indexing.common.task.AbstractBatchIndexTask; @@ -1683,21 +1683,24 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen final SimpleRowIngestionMeters buildSegmentsRowStats = new SimpleRowIngestionMeters(); for (String runningTaskId : runningTaskIds) { try { - final Map report = getTaskReport(toolbox.getOverlordClient(), runningTaskId); + final TaskReport.ReportMap report = getTaskReport(toolbox.getOverlordClient(), runningTaskId); if (report == null || report.isEmpty()) { // task does not have a running report yet continue; } - Map ingestionStatsAndErrors = (Map) report.get("ingestionStatsAndErrors"); - Map payload = (Map) ingestionStatsAndErrors.get("payload"); - Map rowStats = (Map) payload.get("rowStats"); + final IngestionStatsAndErrorsTaskReport ingestionStatsReport + = (IngestionStatsAndErrorsTaskReport) report.get(IngestionStatsAndErrorsTaskReport.REPORT_KEY); + + final IngestionStatsAndErrors payload = ingestionStatsReport.getPayload(); + + Map rowStats = payload.getRowStats(); Map totals = (Map) rowStats.get("totals"); Map buildSegments = (Map) totals.get(RowIngestionMeters.BUILD_SEGMENTS); if (includeUnparseable) { - Map taskUnparseableEvents = (Map) payload.get("unparseableEvents"); + Map taskUnparseableEvents = payload.getUnparseableEvents(); List buildSegmentsUnparseableEvents = (List) taskUnparseableEvents.get(RowIngestionMeters.BUILD_SEGMENTS); unparseableEvents.addAll(buildSegmentsUnparseableEvents); @@ -1804,7 +1807,7 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen } @VisibleForTesting - public TaskReport.ReportMap doGetLiveReports(boolean isFullReport) + TaskReport.ReportMap doGetLiveReports(boolean isFullReport) { Pair, Map> rowStatsAndUnparsebleEvents = doGetRowStatsAndUnparseableEvents(isFullReport, true); @@ -1846,7 +1849,7 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen */ @Nullable @VisibleForTesting - static Map getTaskReport(final OverlordClient overlordClient, final String taskId) + static TaskReport.ReportMap getTaskReport(final OverlordClient overlordClient, final String taskId) throws InterruptedException, ExecutionException { try { diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialHashSegmentGenerateTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialHashSegmentGenerateTask.java index b6d382472b3..383fc7afb2e 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialHashSegmentGenerateTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialHashSegmentGenerateTask.java @@ -24,7 +24,7 @@ import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.collect.ImmutableSet; import org.apache.druid.indexer.partitions.HashedPartitionsSpec; -import org.apache.druid.indexing.common.TaskReport; +import org.apache.druid.indexer.report.TaskReport; import org.apache.druid.indexing.common.TaskToolbox; import org.apache.druid.indexing.common.actions.SurrogateTaskActionClient; import org.apache.druid.indexing.common.actions.TaskActionClient; diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialRangeSegmentGenerateTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialRangeSegmentGenerateTask.java index 9f36ed63541..cf7ae15a9a5 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialRangeSegmentGenerateTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialRangeSegmentGenerateTask.java @@ -27,7 +27,7 @@ import com.google.common.collect.ImmutableSet; import org.apache.druid.indexer.partitions.DimensionRangePartitionsSpec; import org.apache.druid.indexer.partitions.PartitionsSpec; import org.apache.druid.indexer.partitions.SingleDimensionPartitionsSpec; -import org.apache.druid.indexing.common.TaskReport; +import org.apache.druid.indexer.report.TaskReport; import org.apache.druid.indexing.common.TaskToolbox; import org.apache.druid.indexing.common.actions.SurrogateTaskActionClient; import org.apache.druid.indexing.common.actions.TaskActionClient; diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentGenerateTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentGenerateTask.java index 0fd6b0916b8..768d118d84a 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentGenerateTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentGenerateTask.java @@ -25,8 +25,8 @@ import org.apache.druid.data.input.InputSource; import org.apache.druid.indexer.IngestionState; import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexer.partitions.PartitionsSpec; +import org.apache.druid.indexer.report.TaskReport; import org.apache.druid.indexing.common.TaskRealtimeMetricsMonitorBuilder; -import org.apache.druid.indexing.common.TaskReport; import org.apache.druid.indexing.common.TaskToolbox; import org.apache.druid.indexing.common.stats.TaskRealtimeMetricsMonitor; import org.apache.druid.indexing.common.task.BatchAppenderators; diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentMergeTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentMergeTask.java index d4c13ca3370..e8f1effcfe1 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentMergeTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentMergeTask.java @@ -26,8 +26,8 @@ import com.google.common.collect.Maps; import it.unimi.dsi.fastutil.ints.Int2ObjectMap; import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap; import org.apache.druid.indexer.TaskStatus; +import org.apache.druid.indexer.report.TaskReport; import org.apache.druid.indexing.common.TaskLock; -import org.apache.druid.indexing.common.TaskReport; import org.apache.druid.indexing.common.TaskToolbox; import org.apache.druid.indexing.common.actions.LockListAction; import org.apache.druid.indexing.common.actions.SurrogateAction; diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PushedSegmentsReport.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PushedSegmentsReport.java index 9fad997ef78..c93906b11a2 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PushedSegmentsReport.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PushedSegmentsReport.java @@ -22,7 +22,7 @@ package org.apache.druid.indexing.common.task.batch.parallel; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Preconditions; -import org.apache.druid.indexing.common.TaskReport; +import org.apache.druid.indexer.report.TaskReport; import org.apache.druid.timeline.DataSegment; import java.util.Objects; diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java index 6af09461c40..0b4f62e6388 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java @@ -32,8 +32,8 @@ import org.apache.druid.data.input.InputSource; import org.apache.druid.indexer.IngestionState; import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexer.partitions.DynamicPartitionsSpec; +import org.apache.druid.indexer.report.TaskReport; import org.apache.druid.indexing.common.TaskRealtimeMetricsMonitorBuilder; -import org.apache.druid.indexing.common.TaskReport; import org.apache.druid.indexing.common.TaskToolbox; import org.apache.druid.indexing.common.actions.SurrogateTaskActionClient; import org.apache.druid.indexing.common.actions.TaskActionClient; @@ -542,7 +542,7 @@ public class SinglePhaseSubTask extends AbstractBatchSubtask implements ChatHand return Response.ok(doGetRowStats(full != null)).build(); } - private TaskReport.ReportMap doGetLiveReports(boolean isFullReport) + TaskReport.ReportMap doGetLiveReports(boolean isFullReport) { return buildLiveIngestionStatsReport( ingestionState, diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ThreadingTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ThreadingTaskRunner.java index 55dba716220..70b4a927af9 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ThreadingTaskRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ThreadingTaskRunner.java @@ -33,8 +33,8 @@ import org.apache.druid.guice.annotations.Self; import org.apache.druid.indexer.RunnerTaskState; import org.apache.druid.indexer.TaskLocation; import org.apache.druid.indexer.TaskStatus; +import org.apache.druid.indexer.report.TaskReportFileWriter; import org.apache.druid.indexing.common.MultipleFileTaskReportFileWriter; -import org.apache.druid.indexing.common.TaskReportFileWriter; import org.apache.druid.indexing.common.TaskStorageDirTracker; import org.apache.druid.indexing.common.TaskToolbox; import org.apache.druid.indexing.common.TaskToolboxFactory; diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java index 12ed2348310..96e1dd40145 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java @@ -51,14 +51,14 @@ import org.apache.druid.error.ErrorResponse; import org.apache.druid.indexer.IngestionState; import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexer.partitions.DynamicPartitionsSpec; -import org.apache.druid.indexing.common.IngestionStatsAndErrors; -import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReport; +import org.apache.druid.indexer.report.IngestionStatsAndErrors; +import org.apache.druid.indexer.report.IngestionStatsAndErrorsTaskReport; +import org.apache.druid.indexer.report.TaskContextReport; +import org.apache.druid.indexer.report.TaskReport; import org.apache.druid.indexing.common.LockGranularity; -import org.apache.druid.indexing.common.TaskContextReport; import org.apache.druid.indexing.common.TaskLock; import org.apache.druid.indexing.common.TaskLockType; import org.apache.druid.indexing.common.TaskRealtimeMetricsMonitorBuilder; -import org.apache.druid.indexing.common.TaskReport; import org.apache.druid.indexing.common.TaskToolbox; import org.apache.druid.indexing.common.actions.CheckPointDataSourceMetadataAction; import org.apache.druid.indexing.common.actions.ResetDataSourceMetadataAction; diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java index fe6c39b17ed..79c4ef86ec6 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java @@ -52,10 +52,10 @@ import org.apache.druid.error.DruidException; import org.apache.druid.indexer.IngestionState; import org.apache.druid.indexer.TaskState; import org.apache.druid.indexer.TaskStatus; -import org.apache.druid.indexing.common.IngestionStatsAndErrors; +import org.apache.druid.indexer.report.IngestionStatsAndErrors; +import org.apache.druid.indexer.report.SingleFileTaskReportFileWriter; +import org.apache.druid.indexer.report.TaskReport; import org.apache.druid.indexing.common.SegmentCacheManagerFactory; -import org.apache.druid.indexing.common.SingleFileTaskReportFileWriter; -import org.apache.druid.indexing.common.TaskReport; import org.apache.druid.indexing.common.TaskToolbox; import org.apache.druid.indexing.common.TaskToolboxFactory; import org.apache.druid.indexing.common.TestUtils; diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java index 5c42ea4da4d..e944ad2aac2 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java @@ -39,7 +39,7 @@ import org.apache.druid.indexer.partitions.DynamicPartitionsSpec; import org.apache.druid.indexer.partitions.HashedPartitionsSpec; import org.apache.druid.indexer.partitions.PartitionsSpec; import org.apache.druid.indexer.partitions.SingleDimensionPartitionsSpec; -import org.apache.druid.indexing.common.IngestionStatsAndErrors; +import org.apache.druid.indexer.report.IngestionStatsAndErrors; import org.apache.druid.indexing.common.LockGranularity; import org.apache.druid.indexing.common.RetryPolicyConfig; import org.apache.druid.indexing.common.RetryPolicyFactory; diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java index 7c14bc96dff..82c07687536 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java @@ -42,12 +42,12 @@ import org.apache.druid.indexer.TaskState; import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexer.partitions.DynamicPartitionsSpec; import org.apache.druid.indexer.partitions.HashedPartitionsSpec; -import org.apache.druid.indexing.common.IngestionStatsAndErrors; +import org.apache.druid.indexer.report.IngestionStatsAndErrors; +import org.apache.druid.indexer.report.SingleFileTaskReportFileWriter; import org.apache.druid.indexing.common.LockGranularity; import org.apache.druid.indexing.common.RetryPolicyConfig; import org.apache.druid.indexing.common.RetryPolicyFactory; import org.apache.druid.indexing.common.SegmentCacheManagerFactory; -import org.apache.druid.indexing.common.SingleFileTaskReportFileWriter; import org.apache.druid.indexing.common.TaskToolbox; import org.apache.druid.indexing.common.TestUtils; import org.apache.druid.indexing.common.config.TaskConfig; diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java index 4e5dadf4dbf..c83edca79b0 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java @@ -45,9 +45,9 @@ import org.apache.druid.indexer.partitions.DynamicPartitionsSpec; import org.apache.druid.indexer.partitions.HashedPartitionsSpec; import org.apache.druid.indexer.partitions.PartitionsSpec; import org.apache.druid.indexer.partitions.SingleDimensionPartitionsSpec; -import org.apache.druid.indexing.common.IngestionStatsAndErrors; +import org.apache.druid.indexer.report.IngestionStatsAndErrors; +import org.apache.druid.indexer.report.TaskReport; import org.apache.druid.indexing.common.LockGranularity; -import org.apache.druid.indexing.common.TaskReport; import org.apache.druid.indexing.common.TaskToolbox; import org.apache.druid.indexing.common.actions.SegmentAllocateAction; import org.apache.druid.indexing.common.config.TaskConfig; diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java index f0f7e329992..032142bba85 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java @@ -36,11 +36,11 @@ import org.apache.druid.data.input.impl.ParseSpec; import org.apache.druid.data.input.impl.RegexInputFormat; import org.apache.druid.data.input.impl.RegexParseSpec; import org.apache.druid.indexer.TaskStatus; -import org.apache.druid.indexing.common.IngestionStatsAndErrors; -import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReport; +import org.apache.druid.indexer.report.IngestionStatsAndErrors; +import org.apache.druid.indexer.report.IngestionStatsAndErrorsTaskReport; +import org.apache.druid.indexer.report.SingleFileTaskReportFileWriter; +import org.apache.druid.indexer.report.TaskReport; import org.apache.druid.indexing.common.SegmentCacheManagerFactory; -import org.apache.druid.indexing.common.SingleFileTaskReportFileWriter; -import org.apache.druid.indexing.common.TaskReport; import org.apache.druid.indexing.common.TaskToolbox; import org.apache.druid.indexing.common.TestUtils; import org.apache.druid.indexing.common.actions.SegmentInsertAction; diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTaskTest.java index 488f8ce98dd..855e9cbc70c 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTaskTest.java @@ -19,15 +19,14 @@ package org.apache.druid.indexing.common.task; -import com.fasterxml.jackson.core.type.TypeReference; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import org.apache.druid.error.DruidException; import org.apache.druid.error.DruidExceptionMatcher; import org.apache.druid.indexer.TaskState; -import org.apache.druid.indexing.common.KillTaskReport; -import org.apache.druid.indexing.common.TaskReport; +import org.apache.druid.indexer.report.KillTaskReport; +import org.apache.druid.indexer.report.TaskReport; import org.apache.druid.indexing.overlord.Segments; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.ISE; @@ -1134,9 +1133,7 @@ public class KillUnusedSegmentsTaskTest extends IngestionTestBase try { Object payload = getObjectMapper().readValue( taskRunner.getTaskReportsFile(), - new TypeReference() - { - } + TaskReport.ReportMap.class ).get(KillTaskReport.REPORT_KEY).getPayload(); return getObjectMapper().convertValue(payload, KillTaskReport.Stats.class); } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/NoopTestTaskReportFileWriter.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/NoopTestTaskReportFileWriter.java index dddbe1bd7c3..7e7860e9d8e 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/NoopTestTaskReportFileWriter.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/NoopTestTaskReportFileWriter.java @@ -20,8 +20,8 @@ package org.apache.druid.indexing.common.task; import com.fasterxml.jackson.databind.ObjectMapper; -import org.apache.druid.indexing.common.TaskReport; -import org.apache.druid.indexing.common.TaskReportFileWriter; +import org.apache.druid.indexer.report.TaskReport; +import org.apache.druid.indexer.report.TaskReportFileWriter; public class NoopTestTaskReportFileWriter implements TaskReportFileWriter { diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ParseExceptionReport.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ParseExceptionReport.java index 0c95773da92..fdeaf68f251 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ParseExceptionReport.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ParseExceptionReport.java @@ -19,7 +19,7 @@ package org.apache.druid.indexing.common.task; -import org.apache.druid.indexing.common.IngestionStatsAndErrors; +import org.apache.druid.indexer.report.IngestionStatsAndErrors; import java.util.ArrayList; import java.util.LinkedHashMap; diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskReportSerdeTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskReportSerdeTest.java deleted file mode 100644 index 404b1cde1f5..00000000000 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskReportSerdeTest.java +++ /dev/null @@ -1,233 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.druid.indexing.common.task; - -import com.fasterxml.jackson.annotation.JsonProperty; -import com.fasterxml.jackson.annotation.JsonTypeName; -import com.fasterxml.jackson.core.type.TypeReference; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.collect.ImmutableMap; -import com.google.common.io.Files; -import org.apache.druid.indexer.IngestionState; -import org.apache.druid.indexing.common.IngestionStatsAndErrors; -import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReport; -import org.apache.druid.indexing.common.KillTaskReport; -import org.apache.druid.indexing.common.SingleFileTaskReportFileWriter; -import org.apache.druid.indexing.common.TaskContextReport; -import org.apache.druid.indexing.common.TaskReport; -import org.apache.druid.indexing.common.TestUtils; -import org.junit.Assert; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TemporaryFolder; - -import java.io.File; -import java.nio.charset.StandardCharsets; -import java.util.Collections; - -public class TaskReportSerdeTest -{ - private final ObjectMapper jsonMapper; - - @Rule - public final TemporaryFolder temporaryFolder = new TemporaryFolder(); - - public TaskReportSerdeTest() - { - TestUtils testUtils = new TestUtils(); - jsonMapper = testUtils.getTestObjectMapper(); - jsonMapper.registerSubtypes(ExceptionalTaskReport.class); - } - - @Test - public void testSerdeOfIngestionReport() throws Exception - { - IngestionStatsAndErrorsTaskReport originalReport = buildTestIngestionReport(); - String reportJson = jsonMapper.writeValueAsString(originalReport); - TaskReport deserialized = jsonMapper.readValue(reportJson, TaskReport.class); - - Assert.assertTrue(deserialized instanceof IngestionStatsAndErrorsTaskReport); - - IngestionStatsAndErrorsTaskReport deserializedReport = (IngestionStatsAndErrorsTaskReport) deserialized; - Assert.assertEquals(originalReport, deserializedReport); - } - - @Test - public void testSerdeOfKillTaskReport() throws Exception - { - KillTaskReport originalReport = new KillTaskReport("taskId", new KillTaskReport.Stats(1, 2, 3)); - String reportJson = jsonMapper.writeValueAsString(originalReport); - TaskReport deserialized = jsonMapper.readValue(reportJson, TaskReport.class); - - Assert.assertTrue(deserialized instanceof KillTaskReport); - - KillTaskReport deserializedReport = (KillTaskReport) deserialized; - Assert.assertEquals(originalReport, deserializedReport); - } - - @Test - public void testSerdeOfTaskContextReport() throws Exception - { - TaskContextReport originalReport = new TaskContextReport( - "taskId", - ImmutableMap.of("key1", "value1", "key2", "value2") - ); - String reportJson = jsonMapper.writeValueAsString(originalReport); - TaskReport deserialized = jsonMapper.readValue(reportJson, TaskReport.class); - - Assert.assertTrue(deserialized instanceof TaskContextReport); - - TaskContextReport deserializedReport = (TaskContextReport) deserialized; - Assert.assertEquals(originalReport, deserializedReport); - } - - @Test - public void testWriteReportMapToFileAndRead() throws Exception - { - IngestionStatsAndErrorsTaskReport report1 = buildTestIngestionReport(); - final File reportFile = temporaryFolder.newFile(); - final SingleFileTaskReportFileWriter writer = new SingleFileTaskReportFileWriter(reportFile); - writer.setObjectMapper(jsonMapper); - TaskReport.ReportMap reportMap1 = TaskReport.buildTaskReports(report1); - writer.write("testID", reportMap1); - - TaskReport.ReportMap reportMap2 = jsonMapper.readValue(reportFile, TaskReport.ReportMap.class); - Assert.assertEquals(reportMap1, reportMap2); - } - - @Test - public void testWriteReportMapToStringAndRead() throws Exception - { - IngestionStatsAndErrorsTaskReport ingestionReport = buildTestIngestionReport(); - TaskReport.ReportMap reportMap = TaskReport.buildTaskReports(ingestionReport); - String json = jsonMapper.writeValueAsString(reportMap); - - TaskReport.ReportMap deserializedReportMap = jsonMapper.readValue(json, TaskReport.ReportMap.class); - Assert.assertEquals(reportMap, deserializedReportMap); - } - - @Test - public void testSerializationOnMissingPartitionStats() throws Exception - { - String json = "{\n" - + " \"type\": \"ingestionStatsAndErrors\",\n" - + " \"taskId\": \"ingestionStatsAndErrors\",\n" - + " \"payload\": {\n" - + " \"ingestionState\": \"COMPLETED\",\n" - + " \"unparseableEvents\": {\n" - + " \"hello\": \"world\"\n" - + " },\n" - + " \"rowStats\": {\n" - + " \"number\": 1234\n" - + " },\n" - + " \"errorMsg\": \"an error message\",\n" - + " \"segmentAvailabilityConfirmed\": true,\n" - + " \"segmentAvailabilityWaitTimeMs\": 1000\n" - + " }\n" - + "}"; - - IngestionStatsAndErrorsTaskReport expected = new IngestionStatsAndErrorsTaskReport( - IngestionStatsAndErrorsTaskReport.REPORT_KEY, - new IngestionStatsAndErrors( - IngestionState.COMPLETED, - ImmutableMap.of( - "hello", "world" - ), - ImmutableMap.of( - "number", 1234 - ), - "an error message", - true, - 1000L, - null, - null, - null - ) - ); - - Assert.assertEquals(expected, jsonMapper.readValue( - json, - new TypeReference() - { - } - )); - } - - @Test - public void testExceptionWhileWritingReport() throws Exception - { - final File reportFile = temporaryFolder.newFile(); - final SingleFileTaskReportFileWriter writer = new SingleFileTaskReportFileWriter(reportFile); - writer.setObjectMapper(jsonMapper); - writer.write("theTask", TaskReport.buildTaskReports(new ExceptionalTaskReport())); - - // Read the file, ensure it's incomplete and not valid JSON. This allows callers to determine the report was - // not complete when written. - Assert.assertEquals( - "{\"report\":{\"type\":\"exceptional\"", - Files.asCharSource(reportFile, StandardCharsets.UTF_8).read() - ); - } - - private IngestionStatsAndErrorsTaskReport buildTestIngestionReport() - { - return new IngestionStatsAndErrorsTaskReport( - "testID", - new IngestionStatsAndErrors( - IngestionState.BUILD_SEGMENTS, - Collections.singletonMap("hello", "world"), - Collections.singletonMap("number", 1234), - "an error message", - true, - 1000L, - Collections.singletonMap("PartitionA", 5000L), - 5L, - 10L - ) - ); - } - - /** - * Task report that throws an exception while being serialized. - */ - @JsonTypeName("exceptional") - private static class ExceptionalTaskReport implements TaskReport - { - @Override - @JsonProperty - public String getTaskId() - { - throw new UnsupportedOperationException("cannot serialize task ID"); - } - - @Override - public String getReportKey() - { - return "report"; - } - - @Override - @JsonProperty - public Object getPayload() - { - throw new UnsupportedOperationException("cannot serialize payload"); - } - } -} diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractMultiPhaseParallelIndexingTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractMultiPhaseParallelIndexingTest.java index cb6aa39d5c6..dc1a1a31e27 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractMultiPhaseParallelIndexingTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractMultiPhaseParallelIndexingTest.java @@ -20,6 +20,7 @@ package org.apache.druid.indexing.common.task.batch.parallel; import com.google.common.base.Preconditions; +import org.apache.druid.common.guava.FutureUtils; import org.apache.druid.data.input.InputFormat; import org.apache.druid.data.input.impl.DimensionsSpec; import org.apache.druid.data.input.impl.LocalInputSource; @@ -28,9 +29,9 @@ import org.apache.druid.data.input.impl.TimestampSpec; import org.apache.druid.indexer.TaskState; import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexer.partitions.PartitionsSpec; +import org.apache.druid.indexer.report.TaskReport; import org.apache.druid.indexing.common.LockGranularity; import org.apache.druid.indexing.common.SegmentCacheManagerFactory; -import org.apache.druid.indexing.common.TaskReport; import org.apache.druid.indexing.common.task.Task; import org.apache.druid.indexing.common.task.Tasks; import org.apache.druid.indexing.input.DruidInputSource; @@ -185,7 +186,7 @@ abstract class AbstractMultiPhaseParallelIndexingTest extends AbstractParallelIn TaskReport.ReportMap runTaskAndGetReports(Task task, TaskState expectedTaskStatus) { runTaskAndVerifyStatus(task, expectedTaskStatus); - return getIndexingServiceClient().getLiveReportsForTask(task.getId()); + return FutureUtils.getUnchecked(getIndexingServiceClient().taskReportAsMap(task.getId()), true); } protected ParallelIndexSupervisorTask createTask( diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java index 84fac9b82d8..7caf64c4900 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java @@ -22,7 +22,6 @@ package org.apache.druid.indexing.common.task.batch.parallel; import com.fasterxml.jackson.databind.InjectableValues; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.jsontype.NamedType; -import com.google.common.base.Function; import com.google.common.base.Optional; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; @@ -49,13 +48,13 @@ import org.apache.druid.indexer.TaskLocation; import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexer.TaskStatusPlus; import org.apache.druid.indexer.partitions.PartitionsSpec; -import org.apache.druid.indexing.common.IngestionStatsAndErrors; -import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReport; +import org.apache.druid.indexer.report.IngestionStatsAndErrors; +import org.apache.druid.indexer.report.IngestionStatsAndErrorsTaskReport; +import org.apache.druid.indexer.report.SingleFileTaskReportFileWriter; +import org.apache.druid.indexer.report.TaskReport; import org.apache.druid.indexing.common.RetryPolicyConfig; import org.apache.druid.indexing.common.RetryPolicyFactory; import org.apache.druid.indexing.common.SegmentCacheManagerFactory; -import org.apache.druid.indexing.common.SingleFileTaskReportFileWriter; -import org.apache.druid.indexing.common.TaskReport; import org.apache.druid.indexing.common.TaskToolbox; import org.apache.druid.indexing.common.TestUtils; import org.apache.druid.indexing.common.actions.TaskActionClient; @@ -119,6 +118,7 @@ import org.junit.rules.TestName; import javax.annotation.Nullable; import java.io.File; import java.io.IOException; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashSet; @@ -128,6 +128,7 @@ import java.util.Set; import java.util.concurrent.CancellationException; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; @@ -327,10 +328,12 @@ public class AbstractParallelIndexSupervisorTaskTest extends IngestionTestBase private volatile Future statusFuture; @MonotonicNonNull private volatile TestLocalTaskActionClient actionClient; + private final CountDownLatch taskFinishLatch; - private TaskContainer(Task task) + private TaskContainer(Task task, CountDownLatch taskFinishLatch) { this.task = task; + this.taskFinishLatch = taskFinishLatch; } public Task getTask() @@ -356,6 +359,8 @@ public class AbstractParallelIndexSupervisorTaskTest extends IngestionTestBase private final ScheduledExecutorService taskKiller = Execs.scheduledSingleThreaded("simple-threading-task-killer"); private final Set killedSubtaskSpecs = new HashSet<>(); + private volatile boolean useTaskFinishLatches = false; + private final List allTaskLatches = new ArrayList<>(); SimpleThreadingTaskRunner(String threadNameBase) { @@ -392,12 +397,6 @@ public class AbstractParallelIndexSupervisorTaskTest extends IngestionTestBase taskKiller.shutdownNow(); } - public String run(Task task) - { - runTask(task); - return task.getId(); - } - private TaskStatus runAndWait(Task task) { try { @@ -439,9 +438,29 @@ public class AbstractParallelIndexSupervisorTaskTest extends IngestionTestBase return tasks.get(taskId); } + private void setUseTaskFinishLatches(boolean useLatches) + { + this.useTaskFinishLatches = useLatches; + if (!useLatches) { + countDownAllTaskLatches(); + } + } + + private void countDownAllTaskLatches() + { + allTaskLatches.forEach(latch -> { + if (latch.getCount() > 0) { + latch.countDown(); + } + }); + } + private Future runTask(Task task) { - final TaskContainer taskContainer = new TaskContainer(task); + final CountDownLatch taskFinishLatch = useTaskFinishLatches ? new CountDownLatch(1) : new CountDownLatch(0); + allTaskLatches.add(taskFinishLatch); + + final TaskContainer taskContainer = new TaskContainer(task, taskFinishLatch); if (tasks.put(task.getId(), taskContainer) != null) { throw new ISE("Duplicate task ID[%s]", task.getId()); } @@ -457,7 +476,9 @@ public class AbstractParallelIndexSupervisorTaskTest extends IngestionTestBase final TaskToolbox toolbox = createTaskToolbox(task, actionClient); taskContainer.setActionClient(actionClient); if (task.isReady(toolbox.getTaskActionClient())) { - return task.run(toolbox); + TaskStatus status = task.run(toolbox); + taskFinishLatch.await(); + return status; } else { getTaskStorage().setStatus(TaskStatus.failure(task.getId(), "Dummy task status failure for testing")); throw new ISE("task[%s] is not ready", task.getId()); @@ -472,7 +493,7 @@ public class AbstractParallelIndexSupervisorTaskTest extends IngestionTestBase taskContainer.setStatusFuture(statusFuture); final ListenableFuture cleanupFuture = Futures.transform( statusFuture, - (Function) status -> { + status -> { shutdownTask(task); return status; }, @@ -481,15 +502,11 @@ public class AbstractParallelIndexSupervisorTaskTest extends IngestionTestBase return cleanupFuture; } - @Nullable - public String cancel(String taskId) + private void cancel(String taskId) { final TaskContainer taskContainer = tasks.remove(taskId); if (taskContainer != null && taskContainer.statusFuture != null) { taskContainer.statusFuture.cancel(true); - return taskId; - } else { - return null; } } @@ -542,23 +559,29 @@ public class AbstractParallelIndexSupervisorTaskTest extends IngestionTestBase public ListenableFuture runTask(String taskId, Object taskObject) { final Task task = (Task) taskObject; - taskRunner.run(injectIfNeeded(task)); + taskRunner.runTask(injectIfNeeded(task)); return Futures.immediateFuture(null); } @Override - public ListenableFuture> taskReportAsMap(String taskId) + public ListenableFuture taskReportAsMap(String taskId) { - return Futures.immediateFuture(null); + return Futures.immediateFuture(getLiveReportsForTask(taskId)); } - public TaskReport.ReportMap getLiveReportsForTask(String taskId) + private TaskReport.ReportMap getLiveReportsForTask(String taskId) { - final Optional task = getTaskStorage().getTask(taskId); - if (!task.isPresent()) { + final Optional taskOptional = getTaskStorage().getTask(taskId); + if (!taskOptional.isPresent()) { return null; } - return ((ParallelIndexSupervisorTask) task.get()).doGetLiveReports(true); + + final Task task = taskOptional.get(); + if (task instanceof SinglePhaseSubTask) { + return ((SinglePhaseSubTask) task).doGetLiveReports(true); + } else { + return ((ParallelIndexSupervisorTask) task).doGetLiveReports(true); + } } public TaskContainer getTaskContainer(String taskId) @@ -566,6 +589,16 @@ public class AbstractParallelIndexSupervisorTaskTest extends IngestionTestBase return taskRunner.getTaskContainer(taskId); } + public void keepTasksRunning() + { + taskRunner.setUseTaskFinishLatches(true); + } + + public void allowTasksToFinish() + { + taskRunner.setUseTaskFinishLatches(false); + } + public TaskStatus runAndWait(Task task) { return taskRunner.runAndWait(injectIfNeeded(task)); @@ -837,7 +870,6 @@ public class AbstractParallelIndexSupervisorTaskTest extends IngestionTestBase { Map unparseableEvents = ImmutableMap.of("buildSegments", expectedUnparseableEvents); Map rowStats = ImmutableMap.of("totals", ImmutableMap.of("buildSegments", expectedTotals)); - return TaskReport.buildTaskReports( new IngestionStatsAndErrorsTaskReport( taskId, @@ -861,9 +893,9 @@ public class AbstractParallelIndexSupervisorTaskTest extends IngestionTestBase TaskReport.ReportMap actualReports ) { - final Optional expectedReportOptional + final java.util.Optional expectedReportOptional = expectedReports.findReport("ingestionStatsAndErrors"); - final Optional actualReportOptional + final java.util.Optional actualReportOptional = actualReports.findReport("ingestionStatsAndErrors"); Assert.assertTrue(expectedReportOptional.isPresent()); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/MultiPhaseParallelIndexingRowStatsTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/MultiPhaseParallelIndexingRowStatsTest.java index b020c360ba8..9e910a66815 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/MultiPhaseParallelIndexingRowStatsTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/MultiPhaseParallelIndexingRowStatsTest.java @@ -27,8 +27,8 @@ import org.apache.druid.data.input.impl.TimestampSpec; import org.apache.druid.indexer.TaskState; import org.apache.druid.indexer.partitions.HashedPartitionsSpec; import org.apache.druid.indexer.partitions.SingleDimensionPartitionsSpec; +import org.apache.druid.indexer.report.TaskReport; import org.apache.druid.indexing.common.LockGranularity; -import org.apache.druid.indexing.common.TaskReport; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.segment.incremental.RowIngestionMetersTotals; diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskResourceTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskResourceTest.java index 6a4e5f90c64..d2ba0af0873 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskResourceTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskResourceTest.java @@ -34,7 +34,7 @@ import org.apache.druid.indexer.TaskState; import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexer.TaskStatusPlus; import org.apache.druid.indexer.partitions.DynamicPartitionsSpec; -import org.apache.druid.indexing.common.TaskReport; +import org.apache.druid.indexer.report.TaskReport; import org.apache.druid.indexing.common.TaskToolbox; import org.apache.druid.indexing.common.task.AbstractTask; import org.apache.druid.indexing.common.task.SegmentAllocators; diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskTest.java index a340655dfcc..d763337978f 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskTest.java @@ -34,6 +34,8 @@ import org.apache.druid.data.input.impl.TimestampSpec; import org.apache.druid.indexer.partitions.HashedPartitionsSpec; import org.apache.druid.indexer.partitions.PartitionsSpec; import org.apache.druid.indexer.partitions.SingleDimensionPartitionsSpec; +import org.apache.druid.indexer.report.KillTaskReport; +import org.apache.druid.indexer.report.TaskReport; import org.apache.druid.indexing.common.TaskToolbox; import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.java.util.common.IAE; @@ -481,7 +483,7 @@ public class ParallelIndexSupervisorTaskTest public void testGetTaskReportOk() throws Exception { final String taskId = "task"; - final Map report = ImmutableMap.of("foo", "bar"); + final TaskReport.ReportMap report = TaskReport.buildTaskReports(new KillTaskReport("taskId", null)); final OverlordClient client = mock(OverlordClient.class); expect(client.taskReportAsMap(taskId)).andReturn(Futures.immediateFuture(report)); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PushedSegmentsReportTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PushedSegmentsReportTest.java index 3edce650688..b46f3da5b1f 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PushedSegmentsReportTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PushedSegmentsReportTest.java @@ -20,8 +20,8 @@ package org.apache.druid.indexing.common.task.batch.parallel; import nl.jqno.equalsverifier.EqualsVerifier; -import org.apache.druid.indexing.common.KillTaskReport; -import org.apache.druid.indexing.common.TaskReport; +import org.apache.druid.indexer.report.KillTaskReport; +import org.apache.druid.indexer.report.TaskReport; import org.junit.Test; public class PushedSegmentsReportTest diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexingTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexingTest.java index bc2e38f798e..143e0b0474a 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexingTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexingTest.java @@ -28,8 +28,11 @@ import org.apache.druid.data.input.impl.DimensionsSpec; import org.apache.druid.data.input.impl.JsonInputFormat; import org.apache.druid.data.input.impl.LocalInputSource; import org.apache.druid.indexer.TaskState; +import org.apache.druid.indexer.TaskStatus; +import org.apache.druid.indexer.report.IngestionStatsAndErrors; +import org.apache.druid.indexer.report.IngestionStatsAndErrorsTaskReport; +import org.apache.druid.indexer.report.TaskReport; import org.apache.druid.indexing.common.LockGranularity; -import org.apache.druid.indexing.common.TaskReport; import org.apache.druid.indexing.common.TaskToolbox; import org.apache.druid.indexing.common.actions.TaskActionClient; import org.apache.druid.indexing.common.task.Tasks; @@ -335,6 +338,38 @@ public class SinglePhaseParallelIndexingTest extends AbstractParallelIndexSuperv testRunAndOverwrite(Intervals.of("2017-12/P1M"), Granularities.DAY); } + @Test + public void testGetRunningTaskReports() throws Exception + { + final ParallelIndexSupervisorTask task = newTask( + Intervals.of("2017-12/P1M"), + Granularities.DAY, + false, + true + ); + task.addToContext(Tasks.FORCE_TIME_CHUNK_LOCK_KEY, lockGranularity == LockGranularity.TIME_CHUNK); + task.addToContext(DISABLE_TASK_INJECT_CONTEXT_KEY, true); + + // Keep tasks running until finish is triggered + getIndexingServiceClient().keepTasksRunning(); + getIndexingServiceClient().runTask(task.getId(), task); + + // Allow enough time for sub-tasks to be in running state + Thread.sleep(2000); + + // Fetch and verify live reports + TaskReport.ReportMap reportMap = task.doGetLiveReports(true); + IngestionStatsAndErrors statsAndErrors = ((IngestionStatsAndErrorsTaskReport) + reportMap.get("ingestionStatsAndErrors")).getPayload(); + Map rowStats = statsAndErrors.getRowStats(); + Assert.assertTrue(rowStats.containsKey("totals")); + + getIndexingServiceClient().allowTasksToFinish(); + + TaskStatus taskStatus = getIndexingServiceClient().waitToFinish(task, 2, TimeUnit.MINUTES); + Assert.assertEquals(TaskState.SUCCESS, taskStatus.getStatusCode()); + } + @Test public void testRunInParallelIngestNullColumn() { diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/tasklogs/FileTaskLogsTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/tasklogs/FileTaskLogsTest.java index b88b31443e0..046d666b2a3 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/tasklogs/FileTaskLogsTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/tasklogs/FileTaskLogsTest.java @@ -25,7 +25,7 @@ import com.google.common.collect.ImmutableMap; import com.google.common.io.ByteStreams; import com.google.common.io.Files; import org.apache.druid.indexer.TaskStatus; -import org.apache.druid.indexing.common.TaskReport; +import org.apache.druid.indexer.report.TaskReport; import org.apache.druid.indexing.common.config.FileTaskLogsConfig; import org.apache.druid.java.util.common.FileUtils; import org.apache.druid.java.util.common.StringUtils; diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunnerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunnerTest.java index 350bc745036..30d2d289aba 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunnerTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunnerTest.java @@ -25,8 +25,8 @@ import org.apache.druid.client.indexing.NoopOverlordClient; import org.apache.druid.indexer.TaskLocation; import org.apache.druid.indexer.TaskState; import org.apache.druid.indexer.TaskStatus; +import org.apache.druid.indexer.report.SingleFileTaskReportFileWriter; import org.apache.druid.indexing.common.SegmentCacheManagerFactory; -import org.apache.druid.indexing.common.SingleFileTaskReportFileWriter; import org.apache.druid.indexing.common.TaskToolbox; import org.apache.druid.indexing.common.TaskToolboxFactory; import org.apache.druid.indexing.common.TestUtils; diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TestTaskToolboxFactory.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TestTaskToolboxFactory.java index 6082664d36b..0a108b41219 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TestTaskToolboxFactory.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TestTaskToolboxFactory.java @@ -30,8 +30,8 @@ import org.apache.druid.client.indexing.NoopOverlordClient; import org.apache.druid.discovery.DataNodeService; import org.apache.druid.discovery.DruidNodeAnnouncer; import org.apache.druid.discovery.LookupNodeService; +import org.apache.druid.indexer.report.TaskReportFileWriter; import org.apache.druid.indexing.common.SegmentCacheManagerFactory; -import org.apache.druid.indexing.common.TaskReportFileWriter; import org.apache.druid.indexing.common.TaskToolboxFactory; import org.apache.druid.indexing.common.actions.TaskActionClientFactory; import org.apache.druid.indexing.common.config.TaskConfig; diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTestBase.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTestBase.java index 1f7bd929d8a..eabb640c133 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTestBase.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTestBase.java @@ -19,7 +19,6 @@ package org.apache.druid.indexing.seekablestream; -import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.SerializationFeature; import com.fasterxml.jackson.databind.jsontype.NamedType; @@ -52,11 +51,11 @@ import org.apache.druid.discovery.DruidNodeAnnouncer; import org.apache.druid.discovery.LookupNodeService; import org.apache.druid.error.DruidException; import org.apache.druid.indexer.TaskStatus; -import org.apache.druid.indexing.common.IngestionStatsAndErrors; +import org.apache.druid.indexer.report.IngestionStatsAndErrors; +import org.apache.druid.indexer.report.SingleFileTaskReportFileWriter; +import org.apache.druid.indexer.report.TaskReport; import org.apache.druid.indexing.common.LockGranularity; import org.apache.druid.indexing.common.SegmentCacheManagerFactory; -import org.apache.druid.indexing.common.SingleFileTaskReportFileWriter; -import org.apache.druid.indexing.common.TaskReport; import org.apache.druid.indexing.common.TaskToolbox; import org.apache.druid.indexing.common.TaskToolboxFactory; import org.apache.druid.indexing.common.TestUtils; @@ -463,9 +462,7 @@ public abstract class SeekableStreamIndexTaskTestBase extends EasyMockSupport { TaskReport.ReportMap taskReports = OBJECT_MAPPER.readValue( reportsFile, - new TypeReference() - { - } + TaskReport.ReportMap.class ); return IngestionStatsAndErrors.getPayloadFromTaskReports( taskReports diff --git a/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/indexer/AbstractITBatchIndexTest.java b/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/indexer/AbstractITBatchIndexTest.java index 8a2891c162d..daaebe6590e 100644 --- a/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/indexer/AbstractITBatchIndexTest.java +++ b/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/indexer/AbstractITBatchIndexTest.java @@ -23,10 +23,10 @@ import com.google.common.collect.FluentIterable; import com.google.inject.Inject; import org.apache.commons.io.IOUtils; import org.apache.druid.indexer.partitions.SecondaryPartitionType; -import org.apache.druid.indexing.common.IngestionStatsAndErrors; -import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReport; -import org.apache.druid.indexing.common.TaskContextReport; -import org.apache.druid.indexing.common.TaskReport; +import org.apache.druid.indexer.report.IngestionStatsAndErrors; +import org.apache.druid.indexer.report.IngestionStatsAndErrorsTaskReport; +import org.apache.druid.indexer.report.TaskContextReport; +import org.apache.druid.indexer.report.TaskReport; import org.apache.druid.indexing.common.task.batch.parallel.PartialDimensionCardinalityTask; import org.apache.druid.indexing.common.task.batch.parallel.PartialDimensionDistributionTask; import org.apache.druid.indexing.common.task.batch.parallel.PartialGenericSegmentMergeTask; diff --git a/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/msq/ITMultiStageQuery.java b/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/msq/ITMultiStageQuery.java index d110eb5a4a9..0fe486407db 100644 --- a/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/msq/ITMultiStageQuery.java +++ b/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/msq/ITMultiStageQuery.java @@ -22,8 +22,8 @@ package org.apache.druid.testsEx.msq; import com.google.api.client.util.Preconditions; import com.google.common.collect.ImmutableList; import com.google.inject.Inject; -import org.apache.druid.indexing.common.TaskContextReport; -import org.apache.druid.indexing.common.TaskReport; +import org.apache.druid.indexer.report.TaskContextReport; +import org.apache.druid.indexer.report.TaskReport; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.guava.Yielder; @@ -46,7 +46,6 @@ import org.junit.runner.RunWith; import java.util.ArrayList; import java.util.Arrays; import java.util.List; -import java.util.Map; @RunWith(DruidTestRunner.class) @Category(MultiStageQuery.class) @@ -236,7 +235,7 @@ public class ITMultiStageQuery msqHelper.pollTaskIdForSuccess(resultTaskStatus.getTaskId()); - Map statusReport = msqHelper.fetchStatusReports(resultTaskStatus.getTaskId()); + TaskReport.ReportMap statusReport = msqHelper.fetchStatusReports(resultTaskStatus.getTaskId()); MSQTaskReport taskReport = (MSQTaskReport) statusReport.get(MSQTaskReport.REPORT_KEY); if (taskReport == null) { diff --git a/integration-tests/src/main/java/org/apache/druid/testing/clients/OverlordResourceTestClient.java b/integration-tests/src/main/java/org/apache/druid/testing/clients/OverlordResourceTestClient.java index a0c364ced24..2f02c716a85 100644 --- a/integration-tests/src/main/java/org/apache/druid/testing/clients/OverlordResourceTestClient.java +++ b/integration-tests/src/main/java/org/apache/druid/testing/clients/OverlordResourceTestClient.java @@ -26,8 +26,8 @@ import com.google.inject.Inject; import org.apache.druid.client.indexing.TaskStatusResponse; import org.apache.druid.indexer.TaskState; import org.apache.druid.indexer.TaskStatusPlus; -import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReport; -import org.apache.druid.indexing.common.TaskReport; +import org.apache.druid.indexer.report.IngestionStatsAndErrorsTaskReport; +import org.apache.druid.indexer.report.TaskReport; import org.apache.druid.indexing.overlord.http.TaskPayloadResponse; import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManager; import org.apache.druid.java.util.common.ISE; @@ -280,7 +280,7 @@ public class OverlordResourceTestClient } } - public Map getTaskReport(String taskId) + public TaskReport.ReportMap getTaskReport(String taskId) { try { StatusResponseHolder response = makeRequest( @@ -293,9 +293,7 @@ public class OverlordResourceTestClient ); return jsonMapper.readValue( response.getContent(), - new TypeReference>() - { - } + TaskReport.ReportMap.class ); } catch (ISE e) { diff --git a/integration-tests/src/main/java/org/apache/druid/testing/clients/msq/MsqOverlordResourceTestClient.java b/integration-tests/src/main/java/org/apache/druid/testing/clients/msq/MsqOverlordResourceTestClient.java deleted file mode 100644 index f647a75e33a..00000000000 --- a/integration-tests/src/main/java/org/apache/druid/testing/clients/msq/MsqOverlordResourceTestClient.java +++ /dev/null @@ -1,79 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.druid.testing.clients.msq; - -import com.fasterxml.jackson.core.type.TypeReference; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.inject.Inject; -import org.apache.druid.guice.annotations.Json; -import org.apache.druid.indexing.common.TaskReport; -import org.apache.druid.java.util.common.StringUtils; -import org.apache.druid.java.util.http.client.HttpClient; -import org.apache.druid.java.util.http.client.response.StatusResponseHolder; -import org.apache.druid.msq.guice.MSQIndexingModule; -import org.apache.druid.testing.IntegrationTestingConfig; -import org.apache.druid.testing.clients.OverlordResourceTestClient; -import org.apache.druid.testing.guice.TestClient; -import org.jboss.netty.handler.codec.http.HttpMethod; - -import java.util.Map; - -/** - * Overlord resource client for MSQ Tasks - */ -public class MsqOverlordResourceTestClient extends OverlordResourceTestClient -{ - private ObjectMapper jsonMapper; - - @Inject - MsqOverlordResourceTestClient( - @Json ObjectMapper jsonMapper, - @TestClient HttpClient httpClient, - IntegrationTestingConfig config - ) - { - super(jsonMapper, httpClient, config); - this.jsonMapper = jsonMapper; - this.jsonMapper.registerModules(new MSQIndexingModule().getJacksonModules()); - } - - public Map getMsqTaskReport(String taskId) - { - try { - StatusResponseHolder response = makeRequest( - HttpMethod.GET, - StringUtils.format( - "%s%s", - getIndexerURL(), - StringUtils.format("task/%s/reports", StringUtils.urlEncode(taskId)) - ) - ); - return jsonMapper.readValue(response.getContent(), new TypeReference>() - { - }); - } - catch (RuntimeException e) { - throw e; - } - catch (Exception e) { - throw new RuntimeException(e); - } - } -} diff --git a/integration-tests/src/main/java/org/apache/druid/testing/utils/MsqTestQueryHelper.java b/integration-tests/src/main/java/org/apache/druid/testing/utils/MsqTestQueryHelper.java index e98793b3c8c..c5fc437fc9c 100644 --- a/integration-tests/src/main/java/org/apache/druid/testing/utils/MsqTestQueryHelper.java +++ b/integration-tests/src/main/java/org/apache/druid/testing/utils/MsqTestQueryHelper.java @@ -27,21 +27,22 @@ import com.google.common.collect.ImmutableMap; import com.google.inject.Inject; import org.apache.druid.indexer.TaskState; import org.apache.druid.indexer.TaskStatusPlus; -import org.apache.druid.indexing.common.TaskReport; +import org.apache.druid.indexer.report.TaskReport; import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.RetryUtils; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.guava.Yielder; import org.apache.druid.java.util.http.client.response.StatusResponseHolder; +import org.apache.druid.msq.guice.MSQIndexingModule; import org.apache.druid.msq.indexing.report.MSQResultsReport; import org.apache.druid.msq.indexing.report.MSQTaskReport; import org.apache.druid.msq.indexing.report.MSQTaskReportPayload; import org.apache.druid.msq.sql.SqlTaskStatus; import org.apache.druid.sql.http.SqlQuery; import org.apache.druid.testing.IntegrationTestingConfig; +import org.apache.druid.testing.clients.OverlordResourceTestClient; import org.apache.druid.testing.clients.SqlResourceTestClient; -import org.apache.druid.testing.clients.msq.MsqOverlordResourceTestClient; import org.jboss.netty.handler.codec.http.HttpResponseStatus; import org.testng.Assert; @@ -65,7 +66,7 @@ public class MsqTestQueryHelper extends AbstractTestQueryHelper fetchStatusReports(String taskId) + public TaskReport.ReportMap fetchStatusReports(String taskId) { - return overlordClient.getMsqTaskReport(taskId); + return overlordClient.getTaskReport(taskId); } /** diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractITBatchIndexTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractITBatchIndexTest.java index f4a2cdca4b5..54eb8670adc 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractITBatchIndexTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractITBatchIndexTest.java @@ -23,10 +23,10 @@ import com.google.common.collect.FluentIterable; import com.google.inject.Inject; import org.apache.commons.io.IOUtils; import org.apache.druid.indexer.partitions.SecondaryPartitionType; -import org.apache.druid.indexing.common.IngestionStatsAndErrors; -import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReport; -import org.apache.druid.indexing.common.TaskContextReport; -import org.apache.druid.indexing.common.TaskReport; +import org.apache.druid.indexer.report.IngestionStatsAndErrors; +import org.apache.druid.indexer.report.IngestionStatsAndErrorsTaskReport; +import org.apache.druid.indexer.report.TaskContextReport; +import org.apache.druid.indexer.report.TaskReport; import org.apache.druid.indexing.common.task.batch.parallel.PartialDimensionCardinalityTask; import org.apache.druid.indexing.common.task.batch.parallel.PartialDimensionDistributionTask; import org.apache.druid.indexing.common.task.batch.parallel.PartialGenericSegmentMergeTask; diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITCompactionTaskTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITCompactionTaskTest.java index 9bfc2ad02e5..90381bee143 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITCompactionTaskTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITCompactionTaskTest.java @@ -21,9 +21,9 @@ package org.apache.druid.tests.indexer; import com.google.inject.Inject; import org.apache.commons.io.IOUtils; -import org.apache.druid.indexing.common.IngestionStatsAndErrors; -import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReport; -import org.apache.druid.indexing.common.TaskReport; +import org.apache.druid.indexer.report.IngestionStatsAndErrors; +import org.apache.druid.indexer.report.IngestionStatsAndErrorsTaskReport; +import org.apache.druid.indexer.report.TaskReport; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.granularity.GranularityType; diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/IngestionStatsAndErrors.java b/processing/src/main/java/org/apache/druid/indexer/report/IngestionStatsAndErrors.java similarity index 99% rename from indexing-service/src/main/java/org/apache/druid/indexing/common/IngestionStatsAndErrors.java rename to processing/src/main/java/org/apache/druid/indexer/report/IngestionStatsAndErrors.java index d55f5ed0d19..fe656694788 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/IngestionStatsAndErrors.java +++ b/processing/src/main/java/org/apache/druid/indexer/report/IngestionStatsAndErrors.java @@ -17,7 +17,7 @@ * under the License. */ -package org.apache.druid.indexing.common; +package org.apache.druid.indexer.report; import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.annotation.JsonProperty; diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/IngestionStatsAndErrorsTaskReport.java b/processing/src/main/java/org/apache/druid/indexer/report/IngestionStatsAndErrorsTaskReport.java similarity index 98% rename from indexing-service/src/main/java/org/apache/druid/indexing/common/IngestionStatsAndErrorsTaskReport.java rename to processing/src/main/java/org/apache/druid/indexer/report/IngestionStatsAndErrorsTaskReport.java index 0122a8d3ffb..769084d43f0 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/IngestionStatsAndErrorsTaskReport.java +++ b/processing/src/main/java/org/apache/druid/indexer/report/IngestionStatsAndErrorsTaskReport.java @@ -17,7 +17,7 @@ * under the License. */ -package org.apache.druid.indexing.common; +package org.apache.druid.indexer.report; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/KillTaskReport.java b/processing/src/main/java/org/apache/druid/indexer/report/KillTaskReport.java similarity index 98% rename from indexing-service/src/main/java/org/apache/druid/indexing/common/KillTaskReport.java rename to processing/src/main/java/org/apache/druid/indexer/report/KillTaskReport.java index f78b3d1d162..c0736a9e1a9 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/KillTaskReport.java +++ b/processing/src/main/java/org/apache/druid/indexer/report/KillTaskReport.java @@ -17,7 +17,7 @@ * under the License. */ -package org.apache.druid.indexing.common; +package org.apache.druid.indexer.report; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/SingleFileTaskReportFileWriter.java b/processing/src/main/java/org/apache/druid/indexer/report/SingleFileTaskReportFileWriter.java similarity index 98% rename from indexing-service/src/main/java/org/apache/druid/indexing/common/SingleFileTaskReportFileWriter.java rename to processing/src/main/java/org/apache/druid/indexer/report/SingleFileTaskReportFileWriter.java index 79f880d3076..d862b224d86 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/SingleFileTaskReportFileWriter.java +++ b/processing/src/main/java/org/apache/druid/indexer/report/SingleFileTaskReportFileWriter.java @@ -17,7 +17,7 @@ * under the License. */ -package org.apache.druid.indexing.common; +package org.apache.druid.indexer.report; import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.druid.java.util.common.FileUtils; diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskContextReport.java b/processing/src/main/java/org/apache/druid/indexer/report/TaskContextReport.java similarity index 98% rename from indexing-service/src/main/java/org/apache/druid/indexing/common/TaskContextReport.java rename to processing/src/main/java/org/apache/druid/indexer/report/TaskContextReport.java index 743f216908d..cd27fa002d7 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskContextReport.java +++ b/processing/src/main/java/org/apache/druid/indexer/report/TaskContextReport.java @@ -17,7 +17,7 @@ * under the License. */ -package org.apache.druid.indexing.common; +package org.apache.druid.indexer.report; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskReport.java b/processing/src/main/java/org/apache/druid/indexer/report/TaskReport.java similarity index 95% rename from indexing-service/src/main/java/org/apache/druid/indexing/common/TaskReport.java rename to processing/src/main/java/org/apache/druid/indexer/report/TaskReport.java index adada4f708d..30caf388a3f 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskReport.java +++ b/processing/src/main/java/org/apache/druid/indexer/report/TaskReport.java @@ -17,13 +17,13 @@ * under the License. */ -package org.apache.druid.indexing.common; +package org.apache.druid.indexer.report; import com.fasterxml.jackson.annotation.JsonSubTypes; import com.fasterxml.jackson.annotation.JsonTypeInfo; -import com.google.common.base.Optional; import java.util.LinkedHashMap; +import java.util.Optional; /** * TaskReport objects contain additional information about an indexing task, such as row statistics, errors, and @@ -74,7 +74,7 @@ public interface TaskReport @SuppressWarnings("unchecked") public Optional findReport(String reportKey) { - return Optional.fromNullable((T) get(reportKey)); + return Optional.ofNullable((T) get(reportKey)); } } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskReportFileWriter.java b/processing/src/main/java/org/apache/druid/indexer/report/TaskReportFileWriter.java similarity index 96% rename from indexing-service/src/main/java/org/apache/druid/indexing/common/TaskReportFileWriter.java rename to processing/src/main/java/org/apache/druid/indexer/report/TaskReportFileWriter.java index 972f0b010f3..bb3ebcd0394 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskReportFileWriter.java +++ b/processing/src/main/java/org/apache/druid/indexer/report/TaskReportFileWriter.java @@ -17,7 +17,7 @@ * under the License. */ -package org.apache.druid.indexing.common; +package org.apache.druid.indexer.report; import com.fasterxml.jackson.databind.ObjectMapper; diff --git a/processing/src/test/java/org/apache/druid/indexer/report/TaskReportSerdeTest.java b/processing/src/test/java/org/apache/druid/indexer/report/TaskReportSerdeTest.java new file mode 100644 index 00000000000..d71964eed66 --- /dev/null +++ b/processing/src/test/java/org/apache/druid/indexer/report/TaskReportSerdeTest.java @@ -0,0 +1,397 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexer.report; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonTypeName; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableMap; +import com.google.common.io.Files; +import org.apache.druid.indexer.IngestionState; +import org.apache.druid.jackson.DefaultObjectMapper; +import org.apache.druid.java.util.common.jackson.JacksonUtils; +import org.apache.druid.segment.incremental.ParseExceptionReport; +import org.apache.druid.segment.incremental.RowIngestionMetersTotals; +import org.apache.druid.segment.incremental.RowMeters; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +public class TaskReportSerdeTest +{ + private final ObjectMapper jsonMapper; + + @Rule + public final TemporaryFolder temporaryFolder = new TemporaryFolder(); + + public TaskReportSerdeTest() + { + jsonMapper = new DefaultObjectMapper(); + jsonMapper.registerSubtypes(ExceptionalTaskReport.class); + } + + @Test + public void testSerdeOfIngestionReport() throws Exception + { + IngestionStatsAndErrorsTaskReport originalReport = buildTestIngestionReport(); + String reportJson = jsonMapper.writeValueAsString(originalReport); + TaskReport deserialized = jsonMapper.readValue(reportJson, TaskReport.class); + + Assert.assertTrue(deserialized instanceof IngestionStatsAndErrorsTaskReport); + + IngestionStatsAndErrorsTaskReport deserializedReport = (IngestionStatsAndErrorsTaskReport) deserialized; + Assert.assertEquals(originalReport, deserializedReport); + } + + @Test + public void testSerdeOfKillTaskReport() throws Exception + { + KillTaskReport originalReport = new KillTaskReport("taskId", new KillTaskReport.Stats(1, 2, 3)); + String reportJson = jsonMapper.writeValueAsString(originalReport); + TaskReport deserialized = jsonMapper.readValue(reportJson, TaskReport.class); + + Assert.assertTrue(deserialized instanceof KillTaskReport); + + KillTaskReport deserializedReport = (KillTaskReport) deserialized; + Assert.assertEquals(originalReport, deserializedReport); + } + + @Test + public void testSerdeOfTaskContextReport() throws Exception + { + TaskContextReport originalReport = new TaskContextReport( + "taskId", + ImmutableMap.of("key1", "value1", "key2", "value2") + ); + String reportJson = jsonMapper.writeValueAsString(originalReport); + TaskReport deserialized = jsonMapper.readValue(reportJson, TaskReport.class); + + Assert.assertTrue(deserialized instanceof TaskContextReport); + + TaskContextReport deserializedReport = (TaskContextReport) deserialized; + Assert.assertEquals(originalReport, deserializedReport); + } + + @Test + public void testWriteReportMapToFileAndRead() throws Exception + { + IngestionStatsAndErrorsTaskReport report1 = buildTestIngestionReport(); + final File reportFile = temporaryFolder.newFile(); + final SingleFileTaskReportFileWriter writer = new SingleFileTaskReportFileWriter(reportFile); + writer.setObjectMapper(jsonMapper); + TaskReport.ReportMap reportMap1 = TaskReport.buildTaskReports(report1); + writer.write("testID", reportMap1); + + TaskReport.ReportMap reportMap2 = jsonMapper.readValue(reportFile, TaskReport.ReportMap.class); + Assert.assertEquals(reportMap1, reportMap2); + } + + @Test + public void testWriteReportMapToStringAndRead() throws Exception + { + IngestionStatsAndErrorsTaskReport ingestionReport = buildTestIngestionReport(); + TaskReport.ReportMap reportMap = TaskReport.buildTaskReports(ingestionReport); + String json = jsonMapper.writeValueAsString(reportMap); + + TaskReport.ReportMap deserializedReportMap = jsonMapper.readValue(json, TaskReport.ReportMap.class); + Assert.assertEquals(reportMap, deserializedReportMap); + } + + @Test + @SuppressWarnings("unchecked") + public void testWritePlainMapAndReadAsReportMap() throws Exception + { + final long now = System.currentTimeMillis(); + final List buildUnparseableEvents = Arrays.asList( + new ParseExceptionReport("abc,def", "invalid timestamp", Arrays.asList("row: 1", "col: 1"), now), + new ParseExceptionReport("xyz,pqr", "invalid timestamp", Arrays.asList("row: 1", "col: 1"), now) + ); + final Map unparseableEvents + = ImmutableMap.of("determinePartitions", Collections.emptyList(), "buildSegments", buildUnparseableEvents); + + final Map emptyAverageMinuteMap = ImmutableMap.of( + "processed", 0, + "processedBytes", 0, + "unparseable", 0, + "thrownAway", 0, + "processedWithError", 0 + ); + + final Map emptyAverages = ImmutableMap.of( + "1m", emptyAverageMinuteMap, + "5m", emptyAverageMinuteMap, + "15m", emptyAverageMinuteMap + ); + + final Map expectedAverages + = ImmutableMap.of("determinePartitions", emptyAverages, "buildSegments", emptyAverages); + + final RowIngestionMetersTotals determinePartitionTotalStats + = RowMeters.with().errors(10).unparseable(1).thrownAway(1).bytes(2000).totalProcessed(100); + final RowIngestionMetersTotals buildSegmentTotalStats + = RowMeters.with().errors(5).unparseable(2).thrownAway(1).bytes(2500).totalProcessed(150); + final Map expectedTotals + = ImmutableMap.of("determinePartitions", determinePartitionTotalStats, "buildSegments", buildSegmentTotalStats); + + final Map expectedRowStats = ImmutableMap.of( + "movingAverages", expectedAverages, + "totals", expectedTotals + ); + + final Map expectedPayload = new HashMap<>(); + expectedPayload.put("ingestionState", IngestionState.COMPLETED); + expectedPayload.put("unparseableEvents", unparseableEvents); + expectedPayload.put("rowStats", expectedRowStats); + + final Map ingestionStatsAndErrors = new HashMap<>(); + ingestionStatsAndErrors.put("taskId", "abc"); + ingestionStatsAndErrors.put("payload", expectedPayload); + ingestionStatsAndErrors.put("type", "ingestionStatsAndErrors"); + + final Map expectedReportMap = new HashMap<>(); + expectedReportMap.put("ingestionStatsAndErrors", ingestionStatsAndErrors); + + final String plainMapJson = jsonMapper.writeValueAsString(expectedReportMap); + + // Verify the top-level structure of the report + final TaskReport.ReportMap deserializedReportMap = jsonMapper.readValue(plainMapJson, TaskReport.ReportMap.class); + Optional ingestStatsReport = deserializedReportMap.findReport( + "ingestionStatsAndErrors"); + Assert.assertTrue(ingestStatsReport.isPresent()); + + Assert.assertEquals("ingestionStatsAndErrors", ingestStatsReport.get().getReportKey()); + Assert.assertEquals("abc", ingestStatsReport.get().getTaskId()); + + // Verify basic fields in the payload + final IngestionStatsAndErrors observedPayload = ingestStatsReport.get().getPayload(); + Assert.assertEquals(expectedPayload.get("ingestionState"), observedPayload.getIngestionState()); + Assert.assertNull(observedPayload.getSegmentsRead()); + Assert.assertNull(observedPayload.getSegmentsPublished()); + Assert.assertNull(observedPayload.getErrorMsg()); + Assert.assertNull(observedPayload.getRecordsProcessed()); + + // Verify stats and unparseable events + final Map observedRowStats = observedPayload.getRowStats(); + Assert.assertEquals(expectedAverages, observedRowStats.get("movingAverages")); + + final Map observedTotals = (Map) observedRowStats.get("totals"); + verifyTotalRowStats(observedTotals, determinePartitionTotalStats, buildSegmentTotalStats); + verifyUnparseableEvents(observedPayload.getUnparseableEvents(), buildUnparseableEvents); + + // Re-serialize report map and deserialize as plain map + final String reportMapJson = jsonMapper.writeValueAsString(deserializedReportMap); + + final Map deserializedPlainMap = jsonMapper.readValue( + reportMapJson, + JacksonUtils.TYPE_REFERENCE_MAP_STRING_OBJECT + ); + + final Map ingestStatsReport2 + = (Map) deserializedPlainMap.get("ingestionStatsAndErrors"); + + // Verify basic fields in the payload + final Map observedPayload2 = (Map) ingestStatsReport2.get("payload"); + Assert.assertEquals(expectedPayload.get("ingestionState").toString(), observedPayload2.get("ingestionState")); + Assert.assertNull(observedPayload2.get("segmentsRead")); + Assert.assertNull(observedPayload2.get("segmentsPublished")); + Assert.assertNull(observedPayload2.get("errorMsg")); + Assert.assertNull(observedPayload2.get("recordsProcessed")); + + // Verify stats and unparseable events + final Map observedRowStats2 = (Map) observedPayload2.get("rowStats"); + Assert.assertEquals(expectedAverages, observedRowStats2.get("movingAverages")); + + final Map observedTotals2 = (Map) observedRowStats2.get("totals"); + verifyTotalRowStats(observedTotals2, determinePartitionTotalStats, buildSegmentTotalStats); + verifyUnparseableEvents( + (Map) observedPayload2.get("unparseableEvents"), + buildUnparseableEvents + ); + } + + @Test + public void testSerializationOnMissingPartitionStats() throws Exception + { + String json = "{\n" + + " \"type\": \"ingestionStatsAndErrors\",\n" + + " \"taskId\": \"ingestionStatsAndErrors\",\n" + + " \"payload\": {\n" + + " \"ingestionState\": \"COMPLETED\",\n" + + " \"unparseableEvents\": {\n" + + " \"hello\": \"world\"\n" + + " },\n" + + " \"rowStats\": {\n" + + " \"number\": 1234\n" + + " },\n" + + " \"errorMsg\": \"an error message\",\n" + + " \"segmentAvailabilityConfirmed\": true,\n" + + " \"segmentAvailabilityWaitTimeMs\": 1000\n" + + " }\n" + + "}"; + + IngestionStatsAndErrorsTaskReport expected = new IngestionStatsAndErrorsTaskReport( + IngestionStatsAndErrorsTaskReport.REPORT_KEY, + new IngestionStatsAndErrors( + IngestionState.COMPLETED, + ImmutableMap.of( + "hello", "world" + ), + ImmutableMap.of( + "number", 1234 + ), + "an error message", + true, + 1000L, + null, + null, + null + ) + ); + + TaskReport deserialized = jsonMapper.readValue(json, TaskReport.class); + Assert.assertEquals(expected.getTaskId(), deserialized.getTaskId()); + Assert.assertEquals(expected, deserialized); + } + + @Test + public void testExceptionWhileWritingReport() throws Exception + { + final File reportFile = temporaryFolder.newFile(); + final SingleFileTaskReportFileWriter writer = new SingleFileTaskReportFileWriter(reportFile); + writer.setObjectMapper(jsonMapper); + writer.write("theTask", TaskReport.buildTaskReports(new ExceptionalTaskReport())); + + // Read the file, ensure it's incomplete and not valid JSON. This allows callers to determine the report was + // not complete when written. + Assert.assertEquals( + "{\"report\":{\"type\":\"exceptional\"", + Files.asCharSource(reportFile, StandardCharsets.UTF_8).read() + ); + } + + private IngestionStatsAndErrorsTaskReport buildTestIngestionReport() + { + return new IngestionStatsAndErrorsTaskReport( + "testID", + new IngestionStatsAndErrors( + IngestionState.BUILD_SEGMENTS, + Collections.singletonMap("hello", "world"), + Collections.singletonMap("number", 1234), + "an error message", + true, + 1000L, + Collections.singletonMap("PartitionA", 5000L), + 5L, + 10L + ) + ); + } + + private void verifyUnparseableEvents( + Map observed, + List buildSegmentUnparseableEvents + ) + { + Assert.assertEquals(Collections.emptyList(), observed.get("determinePartitions")); + + final List observedBuildSegmentUnparseableEvents + = (List) observed.get("buildSegments"); + Assert.assertEquals(2, observedBuildSegmentUnparseableEvents.size()); + + for (int i = 0; i < buildSegmentUnparseableEvents.size(); ++i) { + final ParseExceptionReport expectedEvent = buildSegmentUnparseableEvents.get(i); + final Object observedEvent = observedBuildSegmentUnparseableEvents.get(i); + Assert.assertEquals( + ImmutableMap.of( + "input", expectedEvent.getInput(), + "errorType", expectedEvent.getErrorType(), + "details", expectedEvent.getDetails(), + "timeOfExceptionMillis", expectedEvent.getTimeOfExceptionMillis() + ), + observedEvent + ); + } + } + + private void verifyTotalRowStats( + Map observedTotals, + RowIngestionMetersTotals determinePartitionTotalStats, + RowIngestionMetersTotals buildSegmentTotalStats + ) + { + Assert.assertEquals( + ImmutableMap.of( + "processed", (int) determinePartitionTotalStats.getProcessed(), + "processedBytes", (int) determinePartitionTotalStats.getProcessedBytes(), + "processedWithError", (int) determinePartitionTotalStats.getProcessedWithError(), + "thrownAway", (int) determinePartitionTotalStats.getThrownAway(), + "unparseable", (int) determinePartitionTotalStats.getUnparseable() + ), + observedTotals.get("determinePartitions") + ); + Assert.assertEquals( + ImmutableMap.of( + "processed", (int) buildSegmentTotalStats.getProcessed(), + "processedBytes", (int) buildSegmentTotalStats.getProcessedBytes(), + "processedWithError", (int) buildSegmentTotalStats.getProcessedWithError(), + "thrownAway", (int) buildSegmentTotalStats.getThrownAway(), + "unparseable", (int) buildSegmentTotalStats.getUnparseable() + ), + observedTotals.get("buildSegments") + ); + } + + /** + * Task report that throws an exception while being serialized. + */ + @JsonTypeName("exceptional") + private static class ExceptionalTaskReport implements TaskReport + { + @Override + @JsonProperty + public String getTaskId() + { + throw new UnsupportedOperationException("cannot serialize task ID"); + } + + @Override + public String getReportKey() + { + return "report"; + } + + @Override + @JsonProperty + public Object getPayload() + { + throw new UnsupportedOperationException("cannot serialize payload"); + } + } +} diff --git a/server/src/main/java/org/apache/druid/rpc/indexing/OverlordClient.java b/server/src/main/java/org/apache/druid/rpc/indexing/OverlordClient.java index 57fab2fff4a..2d4dfe4aae7 100644 --- a/server/src/main/java/org/apache/druid/rpc/indexing/OverlordClient.java +++ b/server/src/main/java/org/apache/druid/rpc/indexing/OverlordClient.java @@ -30,6 +30,7 @@ import org.apache.druid.common.guava.FutureUtils; import org.apache.druid.common.utils.IdUtils; import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexer.TaskStatusPlus; +import org.apache.druid.indexer.report.TaskReport; import org.apache.druid.indexing.overlord.supervisor.SupervisorStatus; import org.apache.druid.java.util.common.parsers.CloseableIterator; import org.apache.druid.metadata.LockFilterPolicy; @@ -166,7 +167,7 @@ public interface OverlordClient * Returns a {@link org.apache.druid.rpc.HttpResponseException} with code * {@link javax.ws.rs.core.Response.Status#NOT_FOUND} if there is no report available for some reason. */ - ListenableFuture> taskReportAsMap(String taskId); + ListenableFuture taskReportAsMap(String taskId); /** * Returns the payload for a task as an instance of {@link ClientTaskQuery}. This method only works for tasks diff --git a/server/src/main/java/org/apache/druid/rpc/indexing/OverlordClientImpl.java b/server/src/main/java/org/apache/druid/rpc/indexing/OverlordClientImpl.java index d7fab4b75fa..35276aa723d 100644 --- a/server/src/main/java/org/apache/druid/rpc/indexing/OverlordClientImpl.java +++ b/server/src/main/java/org/apache/druid/rpc/indexing/OverlordClientImpl.java @@ -32,6 +32,7 @@ import org.apache.druid.client.indexing.TaskStatusResponse; import org.apache.druid.common.guava.FutureUtils; import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexer.TaskStatusPlus; +import org.apache.druid.indexer.report.TaskReport; import org.apache.druid.indexing.overlord.supervisor.SupervisorStatus; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.jackson.JacksonUtils; @@ -214,7 +215,7 @@ public class OverlordClientImpl implements OverlordClient } @Override - public ListenableFuture> taskReportAsMap(String taskId) + public ListenableFuture taskReportAsMap(String taskId) { final String path = StringUtils.format("/druid/indexer/v1/task/%s/reports", StringUtils.urlEncode(taskId)); @@ -223,7 +224,7 @@ public class OverlordClientImpl implements OverlordClient new RequestBuilder(HttpMethod.GET, path), new BytesFullResponseHandler() ), - holder -> JacksonUtils.readValue(jsonMapper, holder.getContent(), JacksonUtils.TYPE_REFERENCE_MAP_STRING_OBJECT) + holder -> JacksonUtils.readValue(jsonMapper, holder.getContent(), TaskReport.ReportMap.class) ); } diff --git a/server/src/test/java/org/apache/druid/client/indexing/NoopOverlordClient.java b/server/src/test/java/org/apache/druid/client/indexing/NoopOverlordClient.java index 42ca59ffee7..10ebeb53af2 100644 --- a/server/src/test/java/org/apache/druid/client/indexing/NoopOverlordClient.java +++ b/server/src/test/java/org/apache/druid/client/indexing/NoopOverlordClient.java @@ -22,6 +22,7 @@ package org.apache.druid.client.indexing; import com.google.common.util.concurrent.ListenableFuture; import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexer.TaskStatusPlus; +import org.apache.druid.indexer.report.TaskReport; import org.apache.druid.indexing.overlord.supervisor.SupervisorStatus; import org.apache.druid.java.util.common.parsers.CloseableIterator; import org.apache.druid.metadata.LockFilterPolicy; @@ -84,7 +85,7 @@ public class NoopOverlordClient implements OverlordClient } @Override - public ListenableFuture> taskReportAsMap(String taskId) + public ListenableFuture taskReportAsMap(String taskId) { throw new UnsupportedOperationException(); } diff --git a/server/src/test/java/org/apache/druid/rpc/indexing/OverlordClientImplTest.java b/server/src/test/java/org/apache/druid/rpc/indexing/OverlordClientImplTest.java index 047f94b125c..57bab1fed0d 100644 --- a/server/src/test/java/org/apache/druid/rpc/indexing/OverlordClientImplTest.java +++ b/server/src/test/java/org/apache/druid/rpc/indexing/OverlordClientImplTest.java @@ -35,6 +35,8 @@ import org.apache.druid.indexer.RunnerTaskState; import org.apache.druid.indexer.TaskLocation; import org.apache.druid.indexer.TaskState; import org.apache.druid.indexer.TaskStatusPlus; +import org.apache.druid.indexer.report.KillTaskReport; +import org.apache.druid.indexer.report.TaskReport; import org.apache.druid.indexing.overlord.supervisor.SupervisorStatus; import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.java.util.common.DateTimes; @@ -289,7 +291,7 @@ public class OverlordClientImplTest public void test_taskReportAsMap() throws Exception { final String taskId = "testTaskId"; - final Map response = ImmutableMap.of("test", "value"); + final TaskReport.ReportMap response = TaskReport.buildTaskReports(new KillTaskReport("taskId", null)); serviceClient.expectAndRespond( new RequestBuilder(HttpMethod.GET, "/druid/indexer/v1/task/testTaskId/reports"), @@ -298,7 +300,7 @@ public class OverlordClientImplTest jsonMapper.writeValueAsBytes(response) ); - final ListenableFuture> future = overlordClient.taskReportAsMap(taskId); + final ListenableFuture future = overlordClient.taskReportAsMap(taskId); Assert.assertEquals(response, future.get()); } @@ -319,7 +321,7 @@ public class OverlordClientImplTest ) ); - final ListenableFuture> future = overlordClient.taskReportAsMap(taskId); + final ListenableFuture future = overlordClient.taskReportAsMap(taskId); final ExecutionException e = Assert.assertThrows( ExecutionException.class, @@ -345,7 +347,7 @@ public class OverlordClientImplTest StringUtils.toUtf8("{}") ); - final Map actualResponse = + final TaskReport.ReportMap actualResponse = FutureUtils.getUnchecked(overlordClient.taskReportAsMap(taskID), true); Assert.assertEquals(Collections.emptyMap(), actualResponse); } diff --git a/services/src/main/java/org/apache/druid/cli/CliIndexer.java b/services/src/main/java/org/apache/druid/cli/CliIndexer.java index 7b90b2eddcd..715a8078c2a 100644 --- a/services/src/main/java/org/apache/druid/cli/CliIndexer.java +++ b/services/src/main/java/org/apache/druid/cli/CliIndexer.java @@ -56,8 +56,8 @@ import org.apache.druid.guice.annotations.AttemptId; import org.apache.druid.guice.annotations.Parent; import org.apache.druid.guice.annotations.RemoteChatHandler; import org.apache.druid.guice.annotations.Self; +import org.apache.druid.indexer.report.TaskReportFileWriter; import org.apache.druid.indexing.common.MultipleFileTaskReportFileWriter; -import org.apache.druid.indexing.common.TaskReportFileWriter; import org.apache.druid.indexing.overlord.TaskRunner; import org.apache.druid.indexing.overlord.ThreadingTaskRunner; import org.apache.druid.indexing.worker.Worker; diff --git a/services/src/main/java/org/apache/druid/cli/CliPeon.java b/services/src/main/java/org/apache/druid/cli/CliPeon.java index 92699694dda..6212cb8b9b7 100644 --- a/services/src/main/java/org/apache/druid/cli/CliPeon.java +++ b/services/src/main/java/org/apache/druid/cli/CliPeon.java @@ -71,10 +71,10 @@ import org.apache.druid.guice.annotations.AttemptId; import org.apache.druid.guice.annotations.Json; import org.apache.druid.guice.annotations.Parent; import org.apache.druid.guice.annotations.Self; +import org.apache.druid.indexer.report.SingleFileTaskReportFileWriter; +import org.apache.druid.indexer.report.TaskReportFileWriter; import org.apache.druid.indexing.common.RetryPolicyConfig; import org.apache.druid.indexing.common.RetryPolicyFactory; -import org.apache.druid.indexing.common.SingleFileTaskReportFileWriter; -import org.apache.druid.indexing.common.TaskReportFileWriter; import org.apache.druid.indexing.common.TaskToolboxFactory; import org.apache.druid.indexing.common.actions.LocalTaskActionClientFactory; import org.apache.druid.indexing.common.actions.RemoteTaskActionClientFactory;