Fix OverlordClient to read reports as a concrete `ReportMap` (#16226)

Follow up to #16217 

Changes:
- Update `OverlordClient.getReportAsMap()` to return `TaskReport.ReportMap`
- Move the following classes to `org.apache.druid.indexer.report` in the `druid-processing` module
  - `TaskReport`
  - `KillTaskReport`
  - `IngestionStatsAndErrorsTaskReport`
  - `TaskContextReport`
  - `TaskReportFileWriter`
  - `SingleFileTaskReportFileWriter`
  - `TaskReportSerdeTest`
- Remove `MsqOverlordResourceTestClient` as it had only one method
which is already present in `OverlordResourceTestClient` itself
This commit is contained in:
Kashif Faraz 2024-04-15 08:00:59 +05:30 committed by GitHub
parent 041d0bff5e
commit 81d7b6ebe1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
78 changed files with 714 additions and 569 deletions

View File

@ -54,7 +54,7 @@ import org.apache.druid.data.input.kafkainput.KafkaInputFormat;
import org.apache.druid.data.input.kafkainput.KafkaStringHeaderFormat;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.IngestionStatsAndErrors;
import org.apache.druid.indexer.report.IngestionStatsAndErrors;
import org.apache.druid.indexing.common.LockGranularity;
import org.apache.druid.indexing.common.TestUtils;
import org.apache.druid.indexing.common.task.IndexTaskTest;

View File

@ -43,7 +43,7 @@ import org.apache.druid.data.input.impl.LongDimensionSchema;
import org.apache.druid.data.input.impl.StringDimensionSchema;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.IngestionStatsAndErrors;
import org.apache.druid.indexer.report.IngestionStatsAndErrors;
import org.apache.druid.indexing.common.LockGranularity;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.TestUtils;

View File

@ -22,7 +22,7 @@ package org.apache.druid.msq.exec;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.TaskReport;
import org.apache.druid.indexer.report.TaskReport;
import org.apache.druid.msq.counters.CounterSnapshots;
import org.apache.druid.msq.counters.CounterSnapshotsTree;
import org.apache.druid.msq.indexing.MSQControllerTask;

View File

@ -22,7 +22,7 @@ package org.apache.druid.msq.exec;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.inject.Injector;
import org.apache.druid.client.coordinator.CoordinatorClient;
import org.apache.druid.indexing.common.TaskReport;
import org.apache.druid.indexer.report.TaskReport;
import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;

View File

@ -68,11 +68,11 @@ import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexer.partitions.DimensionRangePartitionsSpec;
import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
import org.apache.druid.indexer.partitions.PartitionsSpec;
import org.apache.druid.indexer.report.TaskContextReport;
import org.apache.druid.indexer.report.TaskReport;
import org.apache.druid.indexing.common.LockGranularity;
import org.apache.druid.indexing.common.TaskContextReport;
import org.apache.druid.indexing.common.TaskLock;
import org.apache.druid.indexing.common.TaskLockType;
import org.apache.druid.indexing.common.TaskReport;
import org.apache.druid.indexing.common.actions.LockListAction;
import org.apache.druid.indexing.common.actions.LockReleaseAction;
import org.apache.druid.indexing.common.actions.MarkSegmentsAsUnusedAction;

View File

@ -24,7 +24,7 @@ import com.google.inject.Injector;
import com.google.inject.Key;
import org.apache.druid.client.coordinator.CoordinatorClient;
import org.apache.druid.guice.annotations.Self;
import org.apache.druid.indexing.common.TaskReport;
import org.apache.druid.indexer.report.TaskReport;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.java.util.common.io.Closer;

View File

@ -19,7 +19,7 @@
package org.apache.druid.msq.indexing.client;
import org.apache.druid.indexing.common.TaskReport;
import org.apache.druid.indexer.report.TaskReport;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.msq.counters.CounterSnapshots;
import org.apache.druid.msq.counters.CounterSnapshotsTree;

View File

@ -22,7 +22,7 @@ package org.apache.druid.msq.indexing.report;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import org.apache.druid.indexing.common.TaskReport;
import org.apache.druid.indexer.report.TaskReport;
@JsonTypeName(MSQTaskReport.REPORT_KEY)
public class MSQTaskReport implements TaskReport

View File

@ -512,12 +512,11 @@ public class SqlStatementResource
)
{
if (sqlStatementState == SqlStatementState.SUCCESS) {
Map<String, Object> payload =
MSQTaskReportPayload msqTaskReportPayload =
SqlStatementResourceHelper.getPayload(contactOverlord(
overlordClient.taskReportAsMap(queryId),
queryId
));
MSQTaskReportPayload msqTaskReportPayload = jsonMapper.convertValue(payload, MSQTaskReportPayload.class);
Optional<List<PageInformation>> pageList = SqlStatementResourceHelper.populatePageList(
msqTaskReportPayload,
msqDestination
@ -607,7 +606,8 @@ public class SqlStatementResource
taskResponse,
statusPlus,
sqlStatementState,
contactOverlord(overlordClient.taskReportAsMap(queryId), queryId)
contactOverlord(overlordClient.taskReportAsMap(queryId), queryId),
jsonMapper
);
} else {
Optional<List<ColumnNameAndTypes>> signature = SqlStatementResourceHelper.getSignature(msqControllerTask);
@ -735,8 +735,9 @@ public class SqlStatementResource
);
}
MSQTaskReportPayload msqTaskReportPayload = jsonMapper.convertValue(SqlStatementResourceHelper.getPayload(
contactOverlord(overlordClient.taskReportAsMap(queryId), queryId)), MSQTaskReportPayload.class);
MSQTaskReportPayload msqTaskReportPayload = SqlStatementResourceHelper.getPayload(
contactOverlord(overlordClient.taskReportAsMap(queryId), queryId)
);
if (msqTaskReportPayload.getResults().getResultYielder() == null) {
results = Optional.empty();
@ -746,8 +747,9 @@ public class SqlStatementResource
} else if (msqControllerTask.getQuerySpec().getDestination() instanceof DurableStorageMSQDestination) {
MSQTaskReportPayload msqTaskReportPayload = jsonMapper.convertValue(SqlStatementResourceHelper.getPayload(
contactOverlord(overlordClient.taskReportAsMap(queryId), queryId)), MSQTaskReportPayload.class);
MSQTaskReportPayload msqTaskReportPayload = SqlStatementResourceHelper.getPayload(
contactOverlord(overlordClient.taskReportAsMap(queryId), queryId)
);
List<PageInformation> pages =
SqlStatementResourceHelper.populatePageList(

View File

@ -32,9 +32,11 @@ import org.apache.druid.frame.processor.FrameProcessors;
import org.apache.druid.indexer.TaskLocation;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.TaskStatusPlus;
import org.apache.druid.indexer.report.TaskReport;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.java.util.common.jackson.JacksonUtils;
import org.apache.druid.msq.counters.ChannelCounters;
import org.apache.druid.msq.counters.CounterSnapshots;
import org.apache.druid.msq.counters.CounterSnapshotsTree;
@ -45,7 +47,10 @@ import org.apache.druid.msq.indexing.destination.DataSourceMSQDestination;
import org.apache.druid.msq.indexing.destination.DurableStorageMSQDestination;
import org.apache.druid.msq.indexing.destination.MSQDestination;
import org.apache.druid.msq.indexing.destination.TaskReportMSQDestination;
import org.apache.druid.msq.indexing.error.MSQErrorReport;
import org.apache.druid.msq.indexing.error.MSQFault;
import org.apache.druid.msq.indexing.report.MSQStagesReport;
import org.apache.druid.msq.indexing.report.MSQTaskReport;
import org.apache.druid.msq.indexing.report.MSQTaskReportPayload;
import org.apache.druid.msq.kernel.StageDefinition;
import org.apache.druid.msq.sql.SqlStatementState;
@ -243,12 +248,13 @@ public class SqlStatementResourceHelper
TaskStatusResponse taskResponse,
TaskStatusPlus statusPlus,
SqlStatementState sqlStatementState,
Map<String, Object> msqPayload
TaskReport.ReportMap msqPayload,
ObjectMapper jsonMapper
)
{
Map<String, Object> exceptionDetails = getQueryExceptionDetails(getPayload(msqPayload));
Map<String, Object> exception = getMap(exceptionDetails, "error");
if (exceptionDetails == null || exception == null) {
final MSQErrorReport exceptionDetails = getQueryExceptionDetails(getPayload(msqPayload));
final MSQFault fault = exceptionDetails == null ? null : exceptionDetails.getFault();
if (exceptionDetails == null || fault == null) {
return Optional.of(new SqlStatementResult(
queryId,
sqlStatementState,
@ -258,18 +264,15 @@ public class SqlStatementResourceHelper
null,
DruidException.forPersona(DruidException.Persona.DEVELOPER)
.ofCategory(DruidException.Category.UNCATEGORIZED)
.build("%s", taskResponse.getStatus().getErrorMsg()).toErrorResponse()
.build("%s", taskResponse.getStatus().getErrorMsg())
.toErrorResponse()
));
}
final String errorMessage = String.valueOf(exception.getOrDefault("errorMessage", statusPlus.getErrorMsg()));
exception.remove("errorMessage");
String errorCode = String.valueOf(exception.getOrDefault("errorCode", "unknown"));
exception.remove("errorCode");
Map<String, String> stringException = new HashMap<>();
for (Map.Entry<String, Object> exceptionKeys : exception.entrySet()) {
stringException.put(exceptionKeys.getKey(), String.valueOf(exceptionKeys.getValue()));
}
final String errorMessage = fault.getErrorMessage() == null ? statusPlus.getErrorMsg() : fault.getErrorMessage();
final String errorCode = fault.getErrorCode() == null ? "unknown" : fault.getErrorCode();
final Map<String, String> exceptionContext = buildExceptionContext(fault, jsonMapper);
return Optional.of(new SqlStatementResult(
queryId,
sqlStatementState,
@ -285,7 +288,7 @@ public class SqlStatementResourceHelper
DruidException ex = bob.forPersona(DruidException.Persona.USER)
.ofCategory(DruidException.Category.UNCATEGORIZED)
.build(errorMessage);
ex.withContext(stringException);
ex.withContext(exceptionContext);
return ex;
}
}).toErrorResponse()
@ -361,22 +364,42 @@ public class SqlStatementResourceHelper
return null;
}
public static Map<String, Object> getQueryExceptionDetails(Map<String, Object> payload)
@Nullable
private static MSQErrorReport getQueryExceptionDetails(MSQTaskReportPayload payload)
{
return getMap(getMap(payload, "status"), "errorReport");
return payload == null ? null : payload.getStatus().getErrorReport();
}
public static Map<String, Object> getMap(Map<String, Object> map, String key)
@Nullable
public static MSQTaskReportPayload getPayload(TaskReport.ReportMap reportMap)
{
if (map == null) {
if (reportMap == null) {
return null;
}
return (Map<String, Object>) map.get(key);
Optional<MSQTaskReport> report = reportMap.findReport("multiStageQuery");
return report.map(MSQTaskReport::getPayload).orElse(null);
}
public static Map<String, Object> getPayload(Map<String, Object> results)
private static Map<String, String> buildExceptionContext(MSQFault fault, ObjectMapper mapper)
{
Map<String, Object> msqReport = getMap(results, "multiStageQuery");
return getMap(msqReport, "payload");
try {
final Map<String, Object> msqFaultAsMap = new HashMap<>(
mapper.readValue(
mapper.writeValueAsBytes(fault),
JacksonUtils.TYPE_REFERENCE_MAP_STRING_OBJECT
)
);
msqFaultAsMap.remove("errorCode");
msqFaultAsMap.remove("errorMessage");
final Map<String, String> exceptionContext = new HashMap<>();
msqFaultAsMap.forEach((key, value) -> exceptionContext.put(key, String.valueOf(value)));
return exceptionContext;
}
catch (Exception e) {
throw DruidException.defensive("Could not read MSQFault[%s] as a map: [%s]", fault, e.getMessage());
}
}
}

View File

@ -22,8 +22,8 @@ package org.apache.druid.msq.indexing;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.druid.frame.key.ClusterByPartitions;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.TaskReport;
import org.apache.druid.indexing.common.TaskReportFileWriter;
import org.apache.druid.indexer.report.TaskReport;
import org.apache.druid.indexer.report.TaskReportFileWriter;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.ISE;

View File

@ -19,8 +19,8 @@
package org.apache.druid.msq.indexing.client;
import org.apache.druid.indexing.common.KillTaskReport;
import org.apache.druid.indexing.common.TaskReport;
import org.apache.druid.indexer.report.KillTaskReport;
import org.apache.druid.indexer.report.TaskReport;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.msq.exec.Controller;
import org.apache.druid.msq.indexing.MSQControllerTask;

View File

@ -19,7 +19,6 @@
package org.apache.druid.msq.indexing.report;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
@ -28,8 +27,8 @@ import org.apache.druid.frame.key.ClusterBy;
import org.apache.druid.frame.key.KeyColumn;
import org.apache.druid.frame.key.KeyOrder;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexing.common.SingleFileTaskReportFileWriter;
import org.apache.druid.indexing.common.TaskReport;
import org.apache.druid.indexer.report.SingleFileTaskReportFileWriter;
import org.apache.druid.indexer.report.TaskReport;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.java.util.common.guava.Yielder;
@ -243,9 +242,7 @@ public class MSQTaskReportTest
final TaskReport.ReportMap reportMap = mapper.readValue(
reportFile,
new TypeReference<TaskReport.ReportMap>()
{
}
TaskReport.ReportMap.class
);
final MSQTaskReport report2 = (MSQTaskReport) reportMap.get(MSQTaskReport.REPORT_KEY);

View File

@ -19,8 +19,6 @@
package org.apache.druid.msq.sql.resources;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
@ -33,7 +31,7 @@ import org.apache.druid.error.ErrorResponse;
import org.apache.druid.indexer.TaskLocation;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.TaskStatusPlus;
import org.apache.druid.indexing.common.TaskReport;
import org.apache.druid.indexer.report.TaskReport;
import org.apache.druid.indexing.common.task.IndexTask;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Intervals;
@ -45,7 +43,6 @@ import org.apache.druid.java.util.http.client.response.StringFullResponseHolder;
import org.apache.druid.msq.counters.ChannelCounters;
import org.apache.druid.msq.counters.CounterSnapshots;
import org.apache.druid.msq.counters.CounterSnapshotsTree;
import org.apache.druid.msq.guice.MSQIndexingModule;
import org.apache.druid.msq.indexing.MSQControllerTask;
import org.apache.druid.msq.indexing.MSQSpec;
import org.apache.druid.msq.indexing.MSQTuningConfig;
@ -105,28 +102,23 @@ import java.util.ArrayDeque;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Supplier;
public class SqlStatementResourceTest extends MSQTestBase
{
public static final DateTime CREATED_TIME = DateTimes.of("2023-05-31T12:00Z");
private static final ObjectMapper JSON_MAPPER = TestHelper.makeJsonMapper();
private static final String ACCEPTED_SELECT_MSQ_QUERY = "QUERY_ID_1";
private static final String RUNNING_SELECT_MSQ_QUERY = "QUERY_ID_2";
private static final String FINISHED_SELECT_MSQ_QUERY = "QUERY_ID_3";
private static final String ERRORED_SELECT_MSQ_QUERY = "QUERY_ID_4";
private static final String RUNNING_NON_MSQ_TASK = "QUERY_ID_5";
private static final String FAILED_NON_MSQ_TASK = "QUERY_ID_6";
private static final String FINISHED_NON_MSQ_TASK = "QUERY_ID_7";
private static final String ACCEPTED_INSERT_MSQ_TASK = "QUERY_ID_8";
private static final String RUNNING_INSERT_MSQ_QUERY = "QUERY_ID_9";
private static final String FINISHED_INSERT_MSQ_QUERY = "QUERY_ID_10";
private static final String ERRORED_INSERT_MSQ_QUERY = "QUERY_ID_11";
@ -235,7 +227,7 @@ public class SqlStatementResourceTest extends MSQTestBase
new Object[]{234, "foo1", "bar1"}
);
private final MSQTaskReport selectTaskReport = new MSQTaskReport(
private final Supplier<MSQTaskReport> selectTaskReport = () -> new MSQTaskReport(
FINISHED_SELECT_MSQ_QUERY,
new MSQTaskReportPayload(
new MSQStatusReport(
@ -350,10 +342,10 @@ public class SqlStatementResourceTest extends MSQTestBase
private static final String FAILURE_MSG = "failure msg";
private static SqlStatementResource resource;
private static String SUPERUSER = "superuser";
private static String STATE_R_USER = "stateR";
private static String STATE_W_USER = "stateW";
private static String STATE_RW_USER = "stateRW";
private static final String SUPERUSER = "superuser";
private static final String STATE_R_USER = "stateR";
private static final String STATE_W_USER = "stateW";
private static final String STATE_RW_USER = "stateRW";
private AuthorizerMapper authorizerMapper = new AuthorizerMapper(null)
{
@ -394,12 +386,8 @@ public class SqlStatementResourceTest extends MSQTestBase
@Mock
private OverlordClient overlordClient;
final ObjectMapper mapper = TestHelper.makeJsonMapper()
.registerModules(new MSQIndexingModule().getJacksonModules());
private void setupMocks(OverlordClient indexingServiceClient) throws JsonProcessingException
private void setupMocks(OverlordClient indexingServiceClient)
{
Mockito.when(indexingServiceClient.taskStatus(ArgumentMatchers.eq(ACCEPTED_SELECT_MSQ_QUERY)))
.thenReturn(Futures.immediateFuture(new TaskStatusResponse(ACCEPTED_SELECT_MSQ_QUERY, new TaskStatusPlus(
ACCEPTED_SELECT_MSQ_QUERY,
@ -466,13 +454,7 @@ public class SqlStatementResourceTest extends MSQTestBase
Mockito.when(indexingServiceClient.taskReportAsMap(FINISHED_SELECT_MSQ_QUERY))
.thenReturn(Futures.immediateFuture(mapper.readValue(
mapper.writeValueAsString(TaskReport.buildTaskReports(selectTaskReport)),
new TypeReference<Map<String, Object>>()
{
}
)));
.thenAnswer(inv -> Futures.immediateFuture(TaskReport.buildTaskReports(selectTaskReport.get())));
Mockito.when(indexingServiceClient.taskStatus(ArgumentMatchers.eq(ERRORED_SELECT_MSQ_QUERY)))
.thenReturn(Futures.immediateFuture(new TaskStatusResponse(ERRORED_SELECT_MSQ_QUERY, new TaskStatusPlus(
@ -605,13 +587,7 @@ public class SqlStatementResourceTest extends MSQTestBase
))));
Mockito.when(indexingServiceClient.taskReportAsMap(ArgumentMatchers.eq(FINISHED_INSERT_MSQ_QUERY)))
.thenReturn(Futures.immediateFuture(mapper.readValue(
mapper.writeValueAsString(TaskReport.buildTaskReports(
MSQ_INSERT_TASK_REPORT)),
new TypeReference<Map<String, Object>>()
{
}
)));
.thenReturn(Futures.immediateFuture(TaskReport.buildTaskReports(MSQ_INSERT_TASK_REPORT)));
Mockito.when(indexingServiceClient.taskPayload(FINISHED_INSERT_MSQ_QUERY))
.thenReturn(Futures.immediateFuture(new TaskPayloadResponse(

View File

@ -31,7 +31,7 @@ import org.apache.druid.client.coordinator.CoordinatorClient;
import org.apache.druid.indexer.TaskLocation;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.TaskReport;
import org.apache.druid.indexer.report.TaskReport;
import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.concurrent.Execs;

View File

@ -19,7 +19,6 @@
package org.apache.druid.msq.test;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
@ -31,7 +30,7 @@ import org.apache.druid.client.indexing.TaskPayloadResponse;
import org.apache.druid.client.indexing.TaskStatusResponse;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexer.TaskStatusPlus;
import org.apache.druid.indexing.common.TaskReport;
import org.apache.druid.indexer.report.TaskReport;
import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.ISE;
@ -42,7 +41,6 @@ import org.apache.druid.msq.indexing.MSQControllerTask;
import org.joda.time.DateTime;
import javax.annotation.Nullable;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -121,22 +119,9 @@ public class MSQTestOverlordServiceClient extends NoopOverlordClient
}
@Override
public ListenableFuture<Map<String, Object>> taskReportAsMap(String taskId)
public ListenableFuture<TaskReport.ReportMap> taskReportAsMap(String taskId)
{
SettableFuture<Map<String, Object>> future = SettableFuture.create();
try {
future.set(
objectMapper.readValue(
objectMapper.writeValueAsBytes(getReportForTask(taskId)),
new TypeReference<Map<String, Object>>()
{
}
));
}
catch (IOException e) {
throw new RuntimeException(e);
}
return future;
return Futures.immediateFuture(getReportForTask(taskId));
}
@Override

View File

@ -22,8 +22,8 @@ package org.apache.druid.msq.test;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.inject.Injector;
import org.apache.druid.frame.processor.Bouncer;
import org.apache.druid.indexing.common.TaskReport;
import org.apache.druid.indexing.common.TaskReportFileWriter;
import org.apache.druid.indexer.report.TaskReport;
import org.apache.druid.indexer.report.TaskReportFileWriter;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.java.util.common.FileUtils;
import org.apache.druid.java.util.common.io.Closer;

View File

@ -20,6 +20,9 @@
package org.apache.druid.indexing.common;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.druid.indexer.report.SingleFileTaskReportFileWriter;
import org.apache.druid.indexer.report.TaskReport;
import org.apache.druid.indexer.report.TaskReportFileWriter;
import org.apache.druid.java.util.common.FileUtils;
import org.apache.druid.java.util.common.logger.Logger;

View File

@ -32,6 +32,7 @@ import org.apache.druid.client.coordinator.CoordinatorClient;
import org.apache.druid.discovery.DataNodeService;
import org.apache.druid.discovery.DruidNodeAnnouncer;
import org.apache.druid.discovery.LookupNodeService;
import org.apache.druid.indexer.report.TaskReportFileWriter;
import org.apache.druid.indexing.common.actions.SegmentTransactionalInsertAction;
import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.indexing.common.config.TaskConfig;

View File

@ -34,6 +34,7 @@ import org.apache.druid.guice.annotations.AttemptId;
import org.apache.druid.guice.annotations.Json;
import org.apache.druid.guice.annotations.Parent;
import org.apache.druid.guice.annotations.RemoteChatHandler;
import org.apache.druid.indexer.report.TaskReportFileWriter;
import org.apache.druid.indexing.common.actions.TaskActionClientFactory;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.common.task.Task;

View File

@ -31,13 +31,13 @@ import org.apache.druid.data.input.InputSource;
import org.apache.druid.data.input.InputSourceReader;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.indexer.IngestionState;
import org.apache.druid.indexing.common.IngestionStatsAndErrors;
import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReport;
import org.apache.druid.indexer.report.IngestionStatsAndErrors;
import org.apache.druid.indexer.report.IngestionStatsAndErrorsTaskReport;
import org.apache.druid.indexer.report.TaskContextReport;
import org.apache.druid.indexer.report.TaskReport;
import org.apache.druid.indexing.common.LockGranularity;
import org.apache.druid.indexing.common.TaskContextReport;
import org.apache.druid.indexing.common.TaskLock;
import org.apache.druid.indexing.common.TaskLockType;
import org.apache.druid.indexing.common.TaskReport;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.actions.LockListAction;
import org.apache.druid.indexing.common.actions.RetrieveUsedSegmentsAction;

View File

@ -40,16 +40,16 @@ import org.apache.druid.discovery.NodeRole;
import org.apache.druid.indexer.IngestionState;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
import org.apache.druid.indexer.report.IngestionStatsAndErrors;
import org.apache.druid.indexer.report.IngestionStatsAndErrorsTaskReport;
import org.apache.druid.indexer.report.TaskContextReport;
import org.apache.druid.indexer.report.TaskReport;
import org.apache.druid.indexing.appenderator.ActionBasedSegmentAllocator;
import org.apache.druid.indexing.appenderator.ActionBasedUsedSegmentChecker;
import org.apache.druid.indexing.common.IngestionStatsAndErrors;
import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReport;
import org.apache.druid.indexing.common.LockGranularity;
import org.apache.druid.indexing.common.TaskContextReport;
import org.apache.druid.indexing.common.TaskLock;
import org.apache.druid.indexing.common.TaskLockType;
import org.apache.druid.indexing.common.TaskRealtimeMetricsMonitorBuilder;
import org.apache.druid.indexing.common.TaskReport;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.actions.SegmentAllocateAction;
import org.apache.druid.indexing.common.actions.SegmentLockAcquireAction;

View File

@ -48,10 +48,10 @@ import org.apache.druid.indexer.Property;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
import org.apache.druid.indexer.partitions.PartitionsSpec;
import org.apache.druid.indexer.report.TaskReport;
import org.apache.druid.indexing.common.LockGranularity;
import org.apache.druid.indexing.common.RetryPolicyFactory;
import org.apache.druid.indexing.common.SegmentCacheManagerFactory;
import org.apache.druid.indexing.common.TaskReport;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.actions.RetrieveUsedSegmentsAction;
import org.apache.druid.indexing.common.actions.TaskActionClient;

View File

@ -43,9 +43,9 @@ import org.apache.druid.indexer.MetadataStorageUpdaterJobHandler;
import org.apache.druid.indexer.TaskMetricsGetter;
import org.apache.druid.indexer.TaskMetricsUtils;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexer.report.TaskReport;
import org.apache.druid.indexing.common.TaskLock;
import org.apache.druid.indexing.common.TaskLockType;
import org.apache.druid.indexing.common.TaskReport;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.indexing.common.actions.TimeChunkLockAcquireAction;

View File

@ -48,9 +48,9 @@ import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
import org.apache.druid.indexer.partitions.HashedPartitionsSpec;
import org.apache.druid.indexer.partitions.PartitionsSpec;
import org.apache.druid.indexer.partitions.SecondaryPartitionType;
import org.apache.druid.indexer.report.TaskReport;
import org.apache.druid.indexing.common.TaskLockType;
import org.apache.druid.indexing.common.TaskRealtimeMetricsMonitorBuilder;
import org.apache.druid.indexing.common.TaskReport;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.indexing.common.stats.TaskRealtimeMetricsMonitor;

View File

@ -29,9 +29,9 @@ import com.google.common.collect.ImmutableSet;
import org.apache.druid.client.indexing.ClientKillUnusedSegmentsTaskQuery;
import org.apache.druid.error.InvalidInput;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.KillTaskReport;
import org.apache.druid.indexer.report.KillTaskReport;
import org.apache.druid.indexer.report.TaskReport;
import org.apache.druid.indexing.common.TaskLock;
import org.apache.druid.indexing.common.TaskReport;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.actions.MarkSegmentsAsUnusedAction;
import org.apache.druid.indexing.common.actions.RetrieveUnusedSegmentsAction;

View File

@ -21,7 +21,7 @@ package org.apache.druid.indexing.common.task.batch.parallel;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.indexing.common.TaskReport;
import org.apache.druid.indexer.report.TaskReport;
import java.util.List;

View File

@ -20,7 +20,7 @@
package org.apache.druid.indexing.common.task.batch.parallel;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.indexing.common.TaskReport;
import org.apache.druid.indexer.report.TaskReport;
import java.util.List;
import java.util.Objects;

View File

@ -40,10 +40,10 @@ import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexer.partitions.DimensionRangePartitionsSpec;
import org.apache.druid.indexer.partitions.HashedPartitionsSpec;
import org.apache.druid.indexer.partitions.PartitionsSpec;
import org.apache.druid.indexing.common.IngestionStatsAndErrors;
import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReport;
import org.apache.druid.indexer.report.IngestionStatsAndErrors;
import org.apache.druid.indexer.report.IngestionStatsAndErrorsTaskReport;
import org.apache.druid.indexer.report.TaskReport;
import org.apache.druid.indexing.common.TaskLockType;
import org.apache.druid.indexing.common.TaskReport;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.indexing.common.task.AbstractBatchIndexTask;
@ -1683,21 +1683,24 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen
final SimpleRowIngestionMeters buildSegmentsRowStats = new SimpleRowIngestionMeters();
for (String runningTaskId : runningTaskIds) {
try {
final Map<String, Object> report = getTaskReport(toolbox.getOverlordClient(), runningTaskId);
final TaskReport.ReportMap report = getTaskReport(toolbox.getOverlordClient(), runningTaskId);
if (report == null || report.isEmpty()) {
// task does not have a running report yet
continue;
}
Map<String, Object> ingestionStatsAndErrors = (Map<String, Object>) report.get("ingestionStatsAndErrors");
Map<String, Object> payload = (Map<String, Object>) ingestionStatsAndErrors.get("payload");
Map<String, Object> rowStats = (Map<String, Object>) payload.get("rowStats");
final IngestionStatsAndErrorsTaskReport ingestionStatsReport
= (IngestionStatsAndErrorsTaskReport) report.get(IngestionStatsAndErrorsTaskReport.REPORT_KEY);
final IngestionStatsAndErrors payload = ingestionStatsReport.getPayload();
Map<String, Object> rowStats = payload.getRowStats();
Map<String, Object> totals = (Map<String, Object>) rowStats.get("totals");
Map<String, Object> buildSegments = (Map<String, Object>) totals.get(RowIngestionMeters.BUILD_SEGMENTS);
if (includeUnparseable) {
Map<String, Object> taskUnparseableEvents = (Map<String, Object>) payload.get("unparseableEvents");
Map<String, Object> taskUnparseableEvents = payload.getUnparseableEvents();
List<ParseExceptionReport> buildSegmentsUnparseableEvents = (List<ParseExceptionReport>)
taskUnparseableEvents.get(RowIngestionMeters.BUILD_SEGMENTS);
unparseableEvents.addAll(buildSegmentsUnparseableEvents);
@ -1804,7 +1807,7 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen
}
@VisibleForTesting
public TaskReport.ReportMap doGetLiveReports(boolean isFullReport)
TaskReport.ReportMap doGetLiveReports(boolean isFullReport)
{
Pair<Map<String, Object>, Map<String, Object>> rowStatsAndUnparsebleEvents =
doGetRowStatsAndUnparseableEvents(isFullReport, true);
@ -1846,7 +1849,7 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen
*/
@Nullable
@VisibleForTesting
static Map<String, Object> getTaskReport(final OverlordClient overlordClient, final String taskId)
static TaskReport.ReportMap getTaskReport(final OverlordClient overlordClient, final String taskId)
throws InterruptedException, ExecutionException
{
try {

View File

@ -24,7 +24,7 @@ import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableSet;
import org.apache.druid.indexer.partitions.HashedPartitionsSpec;
import org.apache.druid.indexing.common.TaskReport;
import org.apache.druid.indexer.report.TaskReport;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.actions.SurrogateTaskActionClient;
import org.apache.druid.indexing.common.actions.TaskActionClient;

View File

@ -27,7 +27,7 @@ import com.google.common.collect.ImmutableSet;
import org.apache.druid.indexer.partitions.DimensionRangePartitionsSpec;
import org.apache.druid.indexer.partitions.PartitionsSpec;
import org.apache.druid.indexer.partitions.SingleDimensionPartitionsSpec;
import org.apache.druid.indexing.common.TaskReport;
import org.apache.druid.indexer.report.TaskReport;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.actions.SurrogateTaskActionClient;
import org.apache.druid.indexing.common.actions.TaskActionClient;

View File

@ -25,8 +25,8 @@ import org.apache.druid.data.input.InputSource;
import org.apache.druid.indexer.IngestionState;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexer.partitions.PartitionsSpec;
import org.apache.druid.indexer.report.TaskReport;
import org.apache.druid.indexing.common.TaskRealtimeMetricsMonitorBuilder;
import org.apache.druid.indexing.common.TaskReport;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.stats.TaskRealtimeMetricsMonitor;
import org.apache.druid.indexing.common.task.BatchAppenderators;

View File

@ -26,8 +26,8 @@ import com.google.common.collect.Maps;
import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexer.report.TaskReport;
import org.apache.druid.indexing.common.TaskLock;
import org.apache.druid.indexing.common.TaskReport;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.actions.LockListAction;
import org.apache.druid.indexing.common.actions.SurrogateAction;

View File

@ -22,7 +22,7 @@ package org.apache.druid.indexing.common.task.batch.parallel;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import org.apache.druid.indexing.common.TaskReport;
import org.apache.druid.indexer.report.TaskReport;
import org.apache.druid.timeline.DataSegment;
import java.util.Objects;

View File

@ -32,8 +32,8 @@ import org.apache.druid.data.input.InputSource;
import org.apache.druid.indexer.IngestionState;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
import org.apache.druid.indexer.report.TaskReport;
import org.apache.druid.indexing.common.TaskRealtimeMetricsMonitorBuilder;
import org.apache.druid.indexing.common.TaskReport;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.actions.SurrogateTaskActionClient;
import org.apache.druid.indexing.common.actions.TaskActionClient;
@ -542,7 +542,7 @@ public class SinglePhaseSubTask extends AbstractBatchSubtask implements ChatHand
return Response.ok(doGetRowStats(full != null)).build();
}
private TaskReport.ReportMap doGetLiveReports(boolean isFullReport)
TaskReport.ReportMap doGetLiveReports(boolean isFullReport)
{
return buildLiveIngestionStatsReport(
ingestionState,

View File

@ -33,8 +33,8 @@ import org.apache.druid.guice.annotations.Self;
import org.apache.druid.indexer.RunnerTaskState;
import org.apache.druid.indexer.TaskLocation;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexer.report.TaskReportFileWriter;
import org.apache.druid.indexing.common.MultipleFileTaskReportFileWriter;
import org.apache.druid.indexing.common.TaskReportFileWriter;
import org.apache.druid.indexing.common.TaskStorageDirTracker;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.TaskToolboxFactory;

View File

@ -51,14 +51,14 @@ import org.apache.druid.error.ErrorResponse;
import org.apache.druid.indexer.IngestionState;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
import org.apache.druid.indexing.common.IngestionStatsAndErrors;
import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReport;
import org.apache.druid.indexer.report.IngestionStatsAndErrors;
import org.apache.druid.indexer.report.IngestionStatsAndErrorsTaskReport;
import org.apache.druid.indexer.report.TaskContextReport;
import org.apache.druid.indexer.report.TaskReport;
import org.apache.druid.indexing.common.LockGranularity;
import org.apache.druid.indexing.common.TaskContextReport;
import org.apache.druid.indexing.common.TaskLock;
import org.apache.druid.indexing.common.TaskLockType;
import org.apache.druid.indexing.common.TaskRealtimeMetricsMonitorBuilder;
import org.apache.druid.indexing.common.TaskReport;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.actions.CheckPointDataSourceMetadataAction;
import org.apache.druid.indexing.common.actions.ResetDataSourceMetadataAction;

View File

@ -52,10 +52,10 @@ import org.apache.druid.error.DruidException;
import org.apache.druid.indexer.IngestionState;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.IngestionStatsAndErrors;
import org.apache.druid.indexer.report.IngestionStatsAndErrors;
import org.apache.druid.indexer.report.SingleFileTaskReportFileWriter;
import org.apache.druid.indexer.report.TaskReport;
import org.apache.druid.indexing.common.SegmentCacheManagerFactory;
import org.apache.druid.indexing.common.SingleFileTaskReportFileWriter;
import org.apache.druid.indexing.common.TaskReport;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.TaskToolboxFactory;
import org.apache.druid.indexing.common.TestUtils;

View File

@ -39,7 +39,7 @@ import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
import org.apache.druid.indexer.partitions.HashedPartitionsSpec;
import org.apache.druid.indexer.partitions.PartitionsSpec;
import org.apache.druid.indexer.partitions.SingleDimensionPartitionsSpec;
import org.apache.druid.indexing.common.IngestionStatsAndErrors;
import org.apache.druid.indexer.report.IngestionStatsAndErrors;
import org.apache.druid.indexing.common.LockGranularity;
import org.apache.druid.indexing.common.RetryPolicyConfig;
import org.apache.druid.indexing.common.RetryPolicyFactory;

View File

@ -42,12 +42,12 @@ import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
import org.apache.druid.indexer.partitions.HashedPartitionsSpec;
import org.apache.druid.indexing.common.IngestionStatsAndErrors;
import org.apache.druid.indexer.report.IngestionStatsAndErrors;
import org.apache.druid.indexer.report.SingleFileTaskReportFileWriter;
import org.apache.druid.indexing.common.LockGranularity;
import org.apache.druid.indexing.common.RetryPolicyConfig;
import org.apache.druid.indexing.common.RetryPolicyFactory;
import org.apache.druid.indexing.common.SegmentCacheManagerFactory;
import org.apache.druid.indexing.common.SingleFileTaskReportFileWriter;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.TestUtils;
import org.apache.druid.indexing.common.config.TaskConfig;

View File

@ -45,9 +45,9 @@ import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
import org.apache.druid.indexer.partitions.HashedPartitionsSpec;
import org.apache.druid.indexer.partitions.PartitionsSpec;
import org.apache.druid.indexer.partitions.SingleDimensionPartitionsSpec;
import org.apache.druid.indexing.common.IngestionStatsAndErrors;
import org.apache.druid.indexer.report.IngestionStatsAndErrors;
import org.apache.druid.indexer.report.TaskReport;
import org.apache.druid.indexing.common.LockGranularity;
import org.apache.druid.indexing.common.TaskReport;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.actions.SegmentAllocateAction;
import org.apache.druid.indexing.common.config.TaskConfig;

View File

@ -36,11 +36,11 @@ import org.apache.druid.data.input.impl.ParseSpec;
import org.apache.druid.data.input.impl.RegexInputFormat;
import org.apache.druid.data.input.impl.RegexParseSpec;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.IngestionStatsAndErrors;
import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReport;
import org.apache.druid.indexer.report.IngestionStatsAndErrors;
import org.apache.druid.indexer.report.IngestionStatsAndErrorsTaskReport;
import org.apache.druid.indexer.report.SingleFileTaskReportFileWriter;
import org.apache.druid.indexer.report.TaskReport;
import org.apache.druid.indexing.common.SegmentCacheManagerFactory;
import org.apache.druid.indexing.common.SingleFileTaskReportFileWriter;
import org.apache.druid.indexing.common.TaskReport;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.TestUtils;
import org.apache.druid.indexing.common.actions.SegmentInsertAction;

View File

@ -19,15 +19,14 @@
package org.apache.druid.indexing.common.task;
import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import org.apache.druid.error.DruidException;
import org.apache.druid.error.DruidExceptionMatcher;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexing.common.KillTaskReport;
import org.apache.druid.indexing.common.TaskReport;
import org.apache.druid.indexer.report.KillTaskReport;
import org.apache.druid.indexer.report.TaskReport;
import org.apache.druid.indexing.overlord.Segments;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.ISE;
@ -1134,9 +1133,7 @@ public class KillUnusedSegmentsTaskTest extends IngestionTestBase
try {
Object payload = getObjectMapper().readValue(
taskRunner.getTaskReportsFile(),
new TypeReference<TaskReport.ReportMap>()
{
}
TaskReport.ReportMap.class
).get(KillTaskReport.REPORT_KEY).getPayload();
return getObjectMapper().convertValue(payload, KillTaskReport.Stats.class);
}

View File

@ -20,8 +20,8 @@
package org.apache.druid.indexing.common.task;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.druid.indexing.common.TaskReport;
import org.apache.druid.indexing.common.TaskReportFileWriter;
import org.apache.druid.indexer.report.TaskReport;
import org.apache.druid.indexer.report.TaskReportFileWriter;
public class NoopTestTaskReportFileWriter implements TaskReportFileWriter
{

View File

@ -19,7 +19,7 @@
package org.apache.druid.indexing.common.task;
import org.apache.druid.indexing.common.IngestionStatsAndErrors;
import org.apache.druid.indexer.report.IngestionStatsAndErrors;
import java.util.ArrayList;
import java.util.LinkedHashMap;

View File

@ -1,233 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexing.common.task;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableMap;
import com.google.common.io.Files;
import org.apache.druid.indexer.IngestionState;
import org.apache.druid.indexing.common.IngestionStatsAndErrors;
import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReport;
import org.apache.druid.indexing.common.KillTaskReport;
import org.apache.druid.indexing.common.SingleFileTaskReportFileWriter;
import org.apache.druid.indexing.common.TaskContextReport;
import org.apache.druid.indexing.common.TaskReport;
import org.apache.druid.indexing.common.TestUtils;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import java.io.File;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
public class TaskReportSerdeTest
{
private final ObjectMapper jsonMapper;
@Rule
public final TemporaryFolder temporaryFolder = new TemporaryFolder();
public TaskReportSerdeTest()
{
TestUtils testUtils = new TestUtils();
jsonMapper = testUtils.getTestObjectMapper();
jsonMapper.registerSubtypes(ExceptionalTaskReport.class);
}
@Test
public void testSerdeOfIngestionReport() throws Exception
{
IngestionStatsAndErrorsTaskReport originalReport = buildTestIngestionReport();
String reportJson = jsonMapper.writeValueAsString(originalReport);
TaskReport deserialized = jsonMapper.readValue(reportJson, TaskReport.class);
Assert.assertTrue(deserialized instanceof IngestionStatsAndErrorsTaskReport);
IngestionStatsAndErrorsTaskReport deserializedReport = (IngestionStatsAndErrorsTaskReport) deserialized;
Assert.assertEquals(originalReport, deserializedReport);
}
@Test
public void testSerdeOfKillTaskReport() throws Exception
{
KillTaskReport originalReport = new KillTaskReport("taskId", new KillTaskReport.Stats(1, 2, 3));
String reportJson = jsonMapper.writeValueAsString(originalReport);
TaskReport deserialized = jsonMapper.readValue(reportJson, TaskReport.class);
Assert.assertTrue(deserialized instanceof KillTaskReport);
KillTaskReport deserializedReport = (KillTaskReport) deserialized;
Assert.assertEquals(originalReport, deserializedReport);
}
@Test
public void testSerdeOfTaskContextReport() throws Exception
{
TaskContextReport originalReport = new TaskContextReport(
"taskId",
ImmutableMap.of("key1", "value1", "key2", "value2")
);
String reportJson = jsonMapper.writeValueAsString(originalReport);
TaskReport deserialized = jsonMapper.readValue(reportJson, TaskReport.class);
Assert.assertTrue(deserialized instanceof TaskContextReport);
TaskContextReport deserializedReport = (TaskContextReport) deserialized;
Assert.assertEquals(originalReport, deserializedReport);
}
@Test
public void testWriteReportMapToFileAndRead() throws Exception
{
IngestionStatsAndErrorsTaskReport report1 = buildTestIngestionReport();
final File reportFile = temporaryFolder.newFile();
final SingleFileTaskReportFileWriter writer = new SingleFileTaskReportFileWriter(reportFile);
writer.setObjectMapper(jsonMapper);
TaskReport.ReportMap reportMap1 = TaskReport.buildTaskReports(report1);
writer.write("testID", reportMap1);
TaskReport.ReportMap reportMap2 = jsonMapper.readValue(reportFile, TaskReport.ReportMap.class);
Assert.assertEquals(reportMap1, reportMap2);
}
@Test
public void testWriteReportMapToStringAndRead() throws Exception
{
IngestionStatsAndErrorsTaskReport ingestionReport = buildTestIngestionReport();
TaskReport.ReportMap reportMap = TaskReport.buildTaskReports(ingestionReport);
String json = jsonMapper.writeValueAsString(reportMap);
TaskReport.ReportMap deserializedReportMap = jsonMapper.readValue(json, TaskReport.ReportMap.class);
Assert.assertEquals(reportMap, deserializedReportMap);
}
@Test
public void testSerializationOnMissingPartitionStats() throws Exception
{
String json = "{\n"
+ " \"type\": \"ingestionStatsAndErrors\",\n"
+ " \"taskId\": \"ingestionStatsAndErrors\",\n"
+ " \"payload\": {\n"
+ " \"ingestionState\": \"COMPLETED\",\n"
+ " \"unparseableEvents\": {\n"
+ " \"hello\": \"world\"\n"
+ " },\n"
+ " \"rowStats\": {\n"
+ " \"number\": 1234\n"
+ " },\n"
+ " \"errorMsg\": \"an error message\",\n"
+ " \"segmentAvailabilityConfirmed\": true,\n"
+ " \"segmentAvailabilityWaitTimeMs\": 1000\n"
+ " }\n"
+ "}";
IngestionStatsAndErrorsTaskReport expected = new IngestionStatsAndErrorsTaskReport(
IngestionStatsAndErrorsTaskReport.REPORT_KEY,
new IngestionStatsAndErrors(
IngestionState.COMPLETED,
ImmutableMap.of(
"hello", "world"
),
ImmutableMap.of(
"number", 1234
),
"an error message",
true,
1000L,
null,
null,
null
)
);
Assert.assertEquals(expected, jsonMapper.readValue(
json,
new TypeReference<TaskReport>()
{
}
));
}
@Test
public void testExceptionWhileWritingReport() throws Exception
{
final File reportFile = temporaryFolder.newFile();
final SingleFileTaskReportFileWriter writer = new SingleFileTaskReportFileWriter(reportFile);
writer.setObjectMapper(jsonMapper);
writer.write("theTask", TaskReport.buildTaskReports(new ExceptionalTaskReport()));
// Read the file, ensure it's incomplete and not valid JSON. This allows callers to determine the report was
// not complete when written.
Assert.assertEquals(
"{\"report\":{\"type\":\"exceptional\"",
Files.asCharSource(reportFile, StandardCharsets.UTF_8).read()
);
}
private IngestionStatsAndErrorsTaskReport buildTestIngestionReport()
{
return new IngestionStatsAndErrorsTaskReport(
"testID",
new IngestionStatsAndErrors(
IngestionState.BUILD_SEGMENTS,
Collections.singletonMap("hello", "world"),
Collections.singletonMap("number", 1234),
"an error message",
true,
1000L,
Collections.singletonMap("PartitionA", 5000L),
5L,
10L
)
);
}
/**
* Task report that throws an exception while being serialized.
*/
@JsonTypeName("exceptional")
private static class ExceptionalTaskReport implements TaskReport
{
@Override
@JsonProperty
public String getTaskId()
{
throw new UnsupportedOperationException("cannot serialize task ID");
}
@Override
public String getReportKey()
{
return "report";
}
@Override
@JsonProperty
public Object getPayload()
{
throw new UnsupportedOperationException("cannot serialize payload");
}
}
}

View File

@ -20,6 +20,7 @@
package org.apache.druid.indexing.common.task.batch.parallel;
import com.google.common.base.Preconditions;
import org.apache.druid.common.guava.FutureUtils;
import org.apache.druid.data.input.InputFormat;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.LocalInputSource;
@ -28,9 +29,9 @@ import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexer.partitions.PartitionsSpec;
import org.apache.druid.indexer.report.TaskReport;
import org.apache.druid.indexing.common.LockGranularity;
import org.apache.druid.indexing.common.SegmentCacheManagerFactory;
import org.apache.druid.indexing.common.TaskReport;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.common.task.Tasks;
import org.apache.druid.indexing.input.DruidInputSource;
@ -185,7 +186,7 @@ abstract class AbstractMultiPhaseParallelIndexingTest extends AbstractParallelIn
TaskReport.ReportMap runTaskAndGetReports(Task task, TaskState expectedTaskStatus)
{
runTaskAndVerifyStatus(task, expectedTaskStatus);
return getIndexingServiceClient().getLiveReportsForTask(task.getId());
return FutureUtils.getUnchecked(getIndexingServiceClient().taskReportAsMap(task.getId()), true);
}
protected ParallelIndexSupervisorTask createTask(

View File

@ -22,7 +22,6 @@ package org.apache.druid.indexing.common.task.batch.parallel;
import com.fasterxml.jackson.databind.InjectableValues;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.google.common.base.Function;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
@ -49,13 +48,13 @@ import org.apache.druid.indexer.TaskLocation;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexer.TaskStatusPlus;
import org.apache.druid.indexer.partitions.PartitionsSpec;
import org.apache.druid.indexing.common.IngestionStatsAndErrors;
import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReport;
import org.apache.druid.indexer.report.IngestionStatsAndErrors;
import org.apache.druid.indexer.report.IngestionStatsAndErrorsTaskReport;
import org.apache.druid.indexer.report.SingleFileTaskReportFileWriter;
import org.apache.druid.indexer.report.TaskReport;
import org.apache.druid.indexing.common.RetryPolicyConfig;
import org.apache.druid.indexing.common.RetryPolicyFactory;
import org.apache.druid.indexing.common.SegmentCacheManagerFactory;
import org.apache.druid.indexing.common.SingleFileTaskReportFileWriter;
import org.apache.druid.indexing.common.TaskReport;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.TestUtils;
import org.apache.druid.indexing.common.actions.TaskActionClient;
@ -119,6 +118,7 @@ import org.junit.rules.TestName;
import javax.annotation.Nullable;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
@ -128,6 +128,7 @@ import java.util.Set;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
@ -327,10 +328,12 @@ public class AbstractParallelIndexSupervisorTaskTest extends IngestionTestBase
private volatile Future<TaskStatus> statusFuture;
@MonotonicNonNull
private volatile TestLocalTaskActionClient actionClient;
private final CountDownLatch taskFinishLatch;
private TaskContainer(Task task)
private TaskContainer(Task task, CountDownLatch taskFinishLatch)
{
this.task = task;
this.taskFinishLatch = taskFinishLatch;
}
public Task getTask()
@ -356,6 +359,8 @@ public class AbstractParallelIndexSupervisorTaskTest extends IngestionTestBase
private final ScheduledExecutorService taskKiller = Execs.scheduledSingleThreaded("simple-threading-task-killer");
private final Set<String> killedSubtaskSpecs = new HashSet<>();
private volatile boolean useTaskFinishLatches = false;
private final List<CountDownLatch> allTaskLatches = new ArrayList<>();
SimpleThreadingTaskRunner(String threadNameBase)
{
@ -392,12 +397,6 @@ public class AbstractParallelIndexSupervisorTaskTest extends IngestionTestBase
taskKiller.shutdownNow();
}
public String run(Task task)
{
runTask(task);
return task.getId();
}
private TaskStatus runAndWait(Task task)
{
try {
@ -439,9 +438,29 @@ public class AbstractParallelIndexSupervisorTaskTest extends IngestionTestBase
return tasks.get(taskId);
}
private void setUseTaskFinishLatches(boolean useLatches)
{
this.useTaskFinishLatches = useLatches;
if (!useLatches) {
countDownAllTaskLatches();
}
}
private void countDownAllTaskLatches()
{
allTaskLatches.forEach(latch -> {
if (latch.getCount() > 0) {
latch.countDown();
}
});
}
private Future<TaskStatus> runTask(Task task)
{
final TaskContainer taskContainer = new TaskContainer(task);
final CountDownLatch taskFinishLatch = useTaskFinishLatches ? new CountDownLatch(1) : new CountDownLatch(0);
allTaskLatches.add(taskFinishLatch);
final TaskContainer taskContainer = new TaskContainer(task, taskFinishLatch);
if (tasks.put(task.getId(), taskContainer) != null) {
throw new ISE("Duplicate task ID[%s]", task.getId());
}
@ -457,7 +476,9 @@ public class AbstractParallelIndexSupervisorTaskTest extends IngestionTestBase
final TaskToolbox toolbox = createTaskToolbox(task, actionClient);
taskContainer.setActionClient(actionClient);
if (task.isReady(toolbox.getTaskActionClient())) {
return task.run(toolbox);
TaskStatus status = task.run(toolbox);
taskFinishLatch.await();
return status;
} else {
getTaskStorage().setStatus(TaskStatus.failure(task.getId(), "Dummy task status failure for testing"));
throw new ISE("task[%s] is not ready", task.getId());
@ -472,7 +493,7 @@ public class AbstractParallelIndexSupervisorTaskTest extends IngestionTestBase
taskContainer.setStatusFuture(statusFuture);
final ListenableFuture<TaskStatus> cleanupFuture = Futures.transform(
statusFuture,
(Function<TaskStatus, TaskStatus>) status -> {
status -> {
shutdownTask(task);
return status;
},
@ -481,15 +502,11 @@ public class AbstractParallelIndexSupervisorTaskTest extends IngestionTestBase
return cleanupFuture;
}
@Nullable
public String cancel(String taskId)
private void cancel(String taskId)
{
final TaskContainer taskContainer = tasks.remove(taskId);
if (taskContainer != null && taskContainer.statusFuture != null) {
taskContainer.statusFuture.cancel(true);
return taskId;
} else {
return null;
}
}
@ -542,23 +559,29 @@ public class AbstractParallelIndexSupervisorTaskTest extends IngestionTestBase
public ListenableFuture<Void> runTask(String taskId, Object taskObject)
{
final Task task = (Task) taskObject;
taskRunner.run(injectIfNeeded(task));
taskRunner.runTask(injectIfNeeded(task));
return Futures.immediateFuture(null);
}
@Override
public ListenableFuture<Map<String, Object>> taskReportAsMap(String taskId)
public ListenableFuture<TaskReport.ReportMap> taskReportAsMap(String taskId)
{
return Futures.immediateFuture(null);
return Futures.immediateFuture(getLiveReportsForTask(taskId));
}
public TaskReport.ReportMap getLiveReportsForTask(String taskId)
private TaskReport.ReportMap getLiveReportsForTask(String taskId)
{
final Optional<Task> task = getTaskStorage().getTask(taskId);
if (!task.isPresent()) {
final Optional<Task> taskOptional = getTaskStorage().getTask(taskId);
if (!taskOptional.isPresent()) {
return null;
}
return ((ParallelIndexSupervisorTask) task.get()).doGetLiveReports(true);
final Task task = taskOptional.get();
if (task instanceof SinglePhaseSubTask) {
return ((SinglePhaseSubTask) task).doGetLiveReports(true);
} else {
return ((ParallelIndexSupervisorTask) task).doGetLiveReports(true);
}
}
public TaskContainer getTaskContainer(String taskId)
@ -566,6 +589,16 @@ public class AbstractParallelIndexSupervisorTaskTest extends IngestionTestBase
return taskRunner.getTaskContainer(taskId);
}
public void keepTasksRunning()
{
taskRunner.setUseTaskFinishLatches(true);
}
public void allowTasksToFinish()
{
taskRunner.setUseTaskFinishLatches(false);
}
public TaskStatus runAndWait(Task task)
{
return taskRunner.runAndWait(injectIfNeeded(task));
@ -837,7 +870,6 @@ public class AbstractParallelIndexSupervisorTaskTest extends IngestionTestBase
{
Map<String, Object> unparseableEvents = ImmutableMap.of("buildSegments", expectedUnparseableEvents);
Map<String, Object> rowStats = ImmutableMap.of("totals", ImmutableMap.of("buildSegments", expectedTotals));
return TaskReport.buildTaskReports(
new IngestionStatsAndErrorsTaskReport(
taskId,
@ -861,9 +893,9 @@ public class AbstractParallelIndexSupervisorTaskTest extends IngestionTestBase
TaskReport.ReportMap actualReports
)
{
final Optional<IngestionStatsAndErrorsTaskReport> expectedReportOptional
final java.util.Optional<IngestionStatsAndErrorsTaskReport> expectedReportOptional
= expectedReports.findReport("ingestionStatsAndErrors");
final Optional<IngestionStatsAndErrorsTaskReport> actualReportOptional
final java.util.Optional<IngestionStatsAndErrorsTaskReport> actualReportOptional
= actualReports.findReport("ingestionStatsAndErrors");
Assert.assertTrue(expectedReportOptional.isPresent());

View File

@ -27,8 +27,8 @@ import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.partitions.HashedPartitionsSpec;
import org.apache.druid.indexer.partitions.SingleDimensionPartitionsSpec;
import org.apache.druid.indexer.report.TaskReport;
import org.apache.druid.indexing.common.LockGranularity;
import org.apache.druid.indexing.common.TaskReport;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.segment.incremental.RowIngestionMetersTotals;

View File

@ -34,7 +34,7 @@ import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexer.TaskStatusPlus;
import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
import org.apache.druid.indexing.common.TaskReport;
import org.apache.druid.indexer.report.TaskReport;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.task.AbstractTask;
import org.apache.druid.indexing.common.task.SegmentAllocators;

View File

@ -34,6 +34,8 @@ import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.indexer.partitions.HashedPartitionsSpec;
import org.apache.druid.indexer.partitions.PartitionsSpec;
import org.apache.druid.indexer.partitions.SingleDimensionPartitionsSpec;
import org.apache.druid.indexer.report.KillTaskReport;
import org.apache.druid.indexer.report.TaskReport;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.IAE;
@ -481,7 +483,7 @@ public class ParallelIndexSupervisorTaskTest
public void testGetTaskReportOk() throws Exception
{
final String taskId = "task";
final Map<String, Object> report = ImmutableMap.of("foo", "bar");
final TaskReport.ReportMap report = TaskReport.buildTaskReports(new KillTaskReport("taskId", null));
final OverlordClient client = mock(OverlordClient.class);
expect(client.taskReportAsMap(taskId)).andReturn(Futures.immediateFuture(report));

View File

@ -20,8 +20,8 @@
package org.apache.druid.indexing.common.task.batch.parallel;
import nl.jqno.equalsverifier.EqualsVerifier;
import org.apache.druid.indexing.common.KillTaskReport;
import org.apache.druid.indexing.common.TaskReport;
import org.apache.druid.indexer.report.KillTaskReport;
import org.apache.druid.indexer.report.TaskReport;
import org.junit.Test;
public class PushedSegmentsReportTest

View File

@ -28,8 +28,11 @@ import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.JsonInputFormat;
import org.apache.druid.data.input.impl.LocalInputSource;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexer.report.IngestionStatsAndErrors;
import org.apache.druid.indexer.report.IngestionStatsAndErrorsTaskReport;
import org.apache.druid.indexer.report.TaskReport;
import org.apache.druid.indexing.common.LockGranularity;
import org.apache.druid.indexing.common.TaskReport;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.indexing.common.task.Tasks;
@ -335,6 +338,38 @@ public class SinglePhaseParallelIndexingTest extends AbstractParallelIndexSuperv
testRunAndOverwrite(Intervals.of("2017-12/P1M"), Granularities.DAY);
}
@Test
public void testGetRunningTaskReports() throws Exception
{
final ParallelIndexSupervisorTask task = newTask(
Intervals.of("2017-12/P1M"),
Granularities.DAY,
false,
true
);
task.addToContext(Tasks.FORCE_TIME_CHUNK_LOCK_KEY, lockGranularity == LockGranularity.TIME_CHUNK);
task.addToContext(DISABLE_TASK_INJECT_CONTEXT_KEY, true);
// Keep tasks running until finish is triggered
getIndexingServiceClient().keepTasksRunning();
getIndexingServiceClient().runTask(task.getId(), task);
// Allow enough time for sub-tasks to be in running state
Thread.sleep(2000);
// Fetch and verify live reports
TaskReport.ReportMap reportMap = task.doGetLiveReports(true);
IngestionStatsAndErrors statsAndErrors = ((IngestionStatsAndErrorsTaskReport)
reportMap.get("ingestionStatsAndErrors")).getPayload();
Map<String, Object> rowStats = statsAndErrors.getRowStats();
Assert.assertTrue(rowStats.containsKey("totals"));
getIndexingServiceClient().allowTasksToFinish();
TaskStatus taskStatus = getIndexingServiceClient().waitToFinish(task, 2, TimeUnit.MINUTES);
Assert.assertEquals(TaskState.SUCCESS, taskStatus.getStatusCode());
}
@Test
public void testRunInParallelIngestNullColumn()
{

View File

@ -25,7 +25,7 @@ import com.google.common.collect.ImmutableMap;
import com.google.common.io.ByteStreams;
import com.google.common.io.Files;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.TaskReport;
import org.apache.druid.indexer.report.TaskReport;
import org.apache.druid.indexing.common.config.FileTaskLogsConfig;
import org.apache.druid.java.util.common.FileUtils;
import org.apache.druid.java.util.common.StringUtils;

View File

@ -25,8 +25,8 @@ import org.apache.druid.client.indexing.NoopOverlordClient;
import org.apache.druid.indexer.TaskLocation;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexer.report.SingleFileTaskReportFileWriter;
import org.apache.druid.indexing.common.SegmentCacheManagerFactory;
import org.apache.druid.indexing.common.SingleFileTaskReportFileWriter;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.TaskToolboxFactory;
import org.apache.druid.indexing.common.TestUtils;

View File

@ -30,8 +30,8 @@ import org.apache.druid.client.indexing.NoopOverlordClient;
import org.apache.druid.discovery.DataNodeService;
import org.apache.druid.discovery.DruidNodeAnnouncer;
import org.apache.druid.discovery.LookupNodeService;
import org.apache.druid.indexer.report.TaskReportFileWriter;
import org.apache.druid.indexing.common.SegmentCacheManagerFactory;
import org.apache.druid.indexing.common.TaskReportFileWriter;
import org.apache.druid.indexing.common.TaskToolboxFactory;
import org.apache.druid.indexing.common.actions.TaskActionClientFactory;
import org.apache.druid.indexing.common.config.TaskConfig;

View File

@ -19,7 +19,6 @@
package org.apache.druid.indexing.seekablestream;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.databind.jsontype.NamedType;
@ -52,11 +51,11 @@ import org.apache.druid.discovery.DruidNodeAnnouncer;
import org.apache.druid.discovery.LookupNodeService;
import org.apache.druid.error.DruidException;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.IngestionStatsAndErrors;
import org.apache.druid.indexer.report.IngestionStatsAndErrors;
import org.apache.druid.indexer.report.SingleFileTaskReportFileWriter;
import org.apache.druid.indexer.report.TaskReport;
import org.apache.druid.indexing.common.LockGranularity;
import org.apache.druid.indexing.common.SegmentCacheManagerFactory;
import org.apache.druid.indexing.common.SingleFileTaskReportFileWriter;
import org.apache.druid.indexing.common.TaskReport;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.TaskToolboxFactory;
import org.apache.druid.indexing.common.TestUtils;
@ -463,9 +462,7 @@ public abstract class SeekableStreamIndexTaskTestBase extends EasyMockSupport
{
TaskReport.ReportMap taskReports = OBJECT_MAPPER.readValue(
reportsFile,
new TypeReference<TaskReport.ReportMap>()
{
}
TaskReport.ReportMap.class
);
return IngestionStatsAndErrors.getPayloadFromTaskReports(
taskReports

View File

@ -23,10 +23,10 @@ import com.google.common.collect.FluentIterable;
import com.google.inject.Inject;
import org.apache.commons.io.IOUtils;
import org.apache.druid.indexer.partitions.SecondaryPartitionType;
import org.apache.druid.indexing.common.IngestionStatsAndErrors;
import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReport;
import org.apache.druid.indexing.common.TaskContextReport;
import org.apache.druid.indexing.common.TaskReport;
import org.apache.druid.indexer.report.IngestionStatsAndErrors;
import org.apache.druid.indexer.report.IngestionStatsAndErrorsTaskReport;
import org.apache.druid.indexer.report.TaskContextReport;
import org.apache.druid.indexer.report.TaskReport;
import org.apache.druid.indexing.common.task.batch.parallel.PartialDimensionCardinalityTask;
import org.apache.druid.indexing.common.task.batch.parallel.PartialDimensionDistributionTask;
import org.apache.druid.indexing.common.task.batch.parallel.PartialGenericSegmentMergeTask;

View File

@ -22,8 +22,8 @@ package org.apache.druid.testsEx.msq;
import com.google.api.client.util.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.inject.Inject;
import org.apache.druid.indexing.common.TaskContextReport;
import org.apache.druid.indexing.common.TaskReport;
import org.apache.druid.indexer.report.TaskContextReport;
import org.apache.druid.indexer.report.TaskReport;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.guava.Yielder;
@ -46,7 +46,6 @@ import org.junit.runner.RunWith;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
@RunWith(DruidTestRunner.class)
@Category(MultiStageQuery.class)
@ -236,7 +235,7 @@ public class ITMultiStageQuery
msqHelper.pollTaskIdForSuccess(resultTaskStatus.getTaskId());
Map<String, TaskReport> statusReport = msqHelper.fetchStatusReports(resultTaskStatus.getTaskId());
TaskReport.ReportMap statusReport = msqHelper.fetchStatusReports(resultTaskStatus.getTaskId());
MSQTaskReport taskReport = (MSQTaskReport) statusReport.get(MSQTaskReport.REPORT_KEY);
if (taskReport == null) {

View File

@ -26,8 +26,8 @@ import com.google.inject.Inject;
import org.apache.druid.client.indexing.TaskStatusResponse;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.TaskStatusPlus;
import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReport;
import org.apache.druid.indexing.common.TaskReport;
import org.apache.druid.indexer.report.IngestionStatsAndErrorsTaskReport;
import org.apache.druid.indexer.report.TaskReport;
import org.apache.druid.indexing.overlord.http.TaskPayloadResponse;
import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManager;
import org.apache.druid.java.util.common.ISE;
@ -280,7 +280,7 @@ public class OverlordResourceTestClient
}
}
public Map<String, TaskReport> getTaskReport(String taskId)
public TaskReport.ReportMap getTaskReport(String taskId)
{
try {
StatusResponseHolder response = makeRequest(
@ -293,9 +293,7 @@ public class OverlordResourceTestClient
);
return jsonMapper.readValue(
response.getContent(),
new TypeReference<Map<String, TaskReport>>()
{
}
TaskReport.ReportMap.class
);
}
catch (ISE e) {

View File

@ -1,79 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.testing.clients.msq;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.inject.Inject;
import org.apache.druid.guice.annotations.Json;
import org.apache.druid.indexing.common.TaskReport;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.http.client.HttpClient;
import org.apache.druid.java.util.http.client.response.StatusResponseHolder;
import org.apache.druid.msq.guice.MSQIndexingModule;
import org.apache.druid.testing.IntegrationTestingConfig;
import org.apache.druid.testing.clients.OverlordResourceTestClient;
import org.apache.druid.testing.guice.TestClient;
import org.jboss.netty.handler.codec.http.HttpMethod;
import java.util.Map;
/**
* Overlord resource client for MSQ Tasks
*/
public class MsqOverlordResourceTestClient extends OverlordResourceTestClient
{
private ObjectMapper jsonMapper;
@Inject
MsqOverlordResourceTestClient(
@Json ObjectMapper jsonMapper,
@TestClient HttpClient httpClient,
IntegrationTestingConfig config
)
{
super(jsonMapper, httpClient, config);
this.jsonMapper = jsonMapper;
this.jsonMapper.registerModules(new MSQIndexingModule().getJacksonModules());
}
public Map<String, TaskReport> getMsqTaskReport(String taskId)
{
try {
StatusResponseHolder response = makeRequest(
HttpMethod.GET,
StringUtils.format(
"%s%s",
getIndexerURL(),
StringUtils.format("task/%s/reports", StringUtils.urlEncode(taskId))
)
);
return jsonMapper.readValue(response.getContent(), new TypeReference<Map<String, TaskReport>>()
{
});
}
catch (RuntimeException e) {
throw e;
}
catch (Exception e) {
throw new RuntimeException(e);
}
}
}

View File

@ -27,21 +27,22 @@ import com.google.common.collect.ImmutableMap;
import com.google.inject.Inject;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.TaskStatusPlus;
import org.apache.druid.indexing.common.TaskReport;
import org.apache.druid.indexer.report.TaskReport;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.RetryUtils;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.guava.Yielder;
import org.apache.druid.java.util.http.client.response.StatusResponseHolder;
import org.apache.druid.msq.guice.MSQIndexingModule;
import org.apache.druid.msq.indexing.report.MSQResultsReport;
import org.apache.druid.msq.indexing.report.MSQTaskReport;
import org.apache.druid.msq.indexing.report.MSQTaskReportPayload;
import org.apache.druid.msq.sql.SqlTaskStatus;
import org.apache.druid.sql.http.SqlQuery;
import org.apache.druid.testing.IntegrationTestingConfig;
import org.apache.druid.testing.clients.OverlordResourceTestClient;
import org.apache.druid.testing.clients.SqlResourceTestClient;
import org.apache.druid.testing.clients.msq.MsqOverlordResourceTestClient;
import org.jboss.netty.handler.codec.http.HttpResponseStatus;
import org.testng.Assert;
@ -65,7 +66,7 @@ public class MsqTestQueryHelper extends AbstractTestQueryHelper<MsqQueryWithResu
private final ObjectMapper jsonMapper;
private final IntegrationTestingConfig config;
private final MsqOverlordResourceTestClient overlordClient;
private final OverlordResourceTestClient overlordClient;
private final SqlResourceTestClient msqClient;
@ -74,7 +75,7 @@ public class MsqTestQueryHelper extends AbstractTestQueryHelper<MsqQueryWithResu
final ObjectMapper jsonMapper,
final SqlResourceTestClient queryClient,
final IntegrationTestingConfig config,
final MsqOverlordResourceTestClient overlordClient,
final OverlordResourceTestClient overlordClient,
final SqlResourceTestClient msqClient
)
{
@ -83,6 +84,8 @@ public class MsqTestQueryHelper extends AbstractTestQueryHelper<MsqQueryWithResu
this.config = config;
this.overlordClient = overlordClient;
this.msqClient = msqClient;
this.jsonMapper.registerModules(new MSQIndexingModule().getJacksonModules());
}
@Override
@ -186,9 +189,9 @@ public class MsqTestQueryHelper extends AbstractTestQueryHelper<MsqQueryWithResu
/**
* Fetches status reports for a given task
*/
public Map<String, TaskReport> fetchStatusReports(String taskId)
public TaskReport.ReportMap fetchStatusReports(String taskId)
{
return overlordClient.getMsqTaskReport(taskId);
return overlordClient.getTaskReport(taskId);
}
/**

View File

@ -23,10 +23,10 @@ import com.google.common.collect.FluentIterable;
import com.google.inject.Inject;
import org.apache.commons.io.IOUtils;
import org.apache.druid.indexer.partitions.SecondaryPartitionType;
import org.apache.druid.indexing.common.IngestionStatsAndErrors;
import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReport;
import org.apache.druid.indexing.common.TaskContextReport;
import org.apache.druid.indexing.common.TaskReport;
import org.apache.druid.indexer.report.IngestionStatsAndErrors;
import org.apache.druid.indexer.report.IngestionStatsAndErrorsTaskReport;
import org.apache.druid.indexer.report.TaskContextReport;
import org.apache.druid.indexer.report.TaskReport;
import org.apache.druid.indexing.common.task.batch.parallel.PartialDimensionCardinalityTask;
import org.apache.druid.indexing.common.task.batch.parallel.PartialDimensionDistributionTask;
import org.apache.druid.indexing.common.task.batch.parallel.PartialGenericSegmentMergeTask;

View File

@ -21,9 +21,9 @@ package org.apache.druid.tests.indexer;
import com.google.inject.Inject;
import org.apache.commons.io.IOUtils;
import org.apache.druid.indexing.common.IngestionStatsAndErrors;
import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReport;
import org.apache.druid.indexing.common.TaskReport;
import org.apache.druid.indexer.report.IngestionStatsAndErrors;
import org.apache.druid.indexer.report.IngestionStatsAndErrorsTaskReport;
import org.apache.druid.indexer.report.TaskReport;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.GranularityType;

View File

@ -17,7 +17,7 @@
* under the License.
*/
package org.apache.druid.indexing.common;
package org.apache.druid.indexer.report;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;

View File

@ -17,7 +17,7 @@
* under the License.
*/
package org.apache.druid.indexing.common;
package org.apache.druid.indexer.report;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;

View File

@ -17,7 +17,7 @@
* under the License.
*/
package org.apache.druid.indexing.common;
package org.apache.druid.indexer.report;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;

View File

@ -17,7 +17,7 @@
* under the License.
*/
package org.apache.druid.indexing.common;
package org.apache.druid.indexer.report;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.druid.java.util.common.FileUtils;

View File

@ -17,7 +17,7 @@
* under the License.
*/
package org.apache.druid.indexing.common;
package org.apache.druid.indexer.report;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;

View File

@ -17,13 +17,13 @@
* under the License.
*/
package org.apache.druid.indexing.common;
package org.apache.druid.indexer.report;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.google.common.base.Optional;
import java.util.LinkedHashMap;
import java.util.Optional;
/**
* TaskReport objects contain additional information about an indexing task, such as row statistics, errors, and
@ -74,7 +74,7 @@ public interface TaskReport
@SuppressWarnings("unchecked")
public <T extends TaskReport> Optional<T> findReport(String reportKey)
{
return Optional.fromNullable((T) get(reportKey));
return Optional.ofNullable((T) get(reportKey));
}
}
}

View File

@ -17,7 +17,7 @@
* under the License.
*/
package org.apache.druid.indexing.common;
package org.apache.druid.indexer.report;
import com.fasterxml.jackson.databind.ObjectMapper;

View File

@ -0,0 +1,397 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexer.report;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableMap;
import com.google.common.io.Files;
import org.apache.druid.indexer.IngestionState;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.jackson.JacksonUtils;
import org.apache.druid.segment.incremental.ParseExceptionReport;
import org.apache.druid.segment.incremental.RowIngestionMetersTotals;
import org.apache.druid.segment.incremental.RowMeters;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import java.io.File;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
public class TaskReportSerdeTest
{
private final ObjectMapper jsonMapper;
@Rule
public final TemporaryFolder temporaryFolder = new TemporaryFolder();
public TaskReportSerdeTest()
{
jsonMapper = new DefaultObjectMapper();
jsonMapper.registerSubtypes(ExceptionalTaskReport.class);
}
@Test
public void testSerdeOfIngestionReport() throws Exception
{
IngestionStatsAndErrorsTaskReport originalReport = buildTestIngestionReport();
String reportJson = jsonMapper.writeValueAsString(originalReport);
TaskReport deserialized = jsonMapper.readValue(reportJson, TaskReport.class);
Assert.assertTrue(deserialized instanceof IngestionStatsAndErrorsTaskReport);
IngestionStatsAndErrorsTaskReport deserializedReport = (IngestionStatsAndErrorsTaskReport) deserialized;
Assert.assertEquals(originalReport, deserializedReport);
}
@Test
public void testSerdeOfKillTaskReport() throws Exception
{
KillTaskReport originalReport = new KillTaskReport("taskId", new KillTaskReport.Stats(1, 2, 3));
String reportJson = jsonMapper.writeValueAsString(originalReport);
TaskReport deserialized = jsonMapper.readValue(reportJson, TaskReport.class);
Assert.assertTrue(deserialized instanceof KillTaskReport);
KillTaskReport deserializedReport = (KillTaskReport) deserialized;
Assert.assertEquals(originalReport, deserializedReport);
}
@Test
public void testSerdeOfTaskContextReport() throws Exception
{
TaskContextReport originalReport = new TaskContextReport(
"taskId",
ImmutableMap.of("key1", "value1", "key2", "value2")
);
String reportJson = jsonMapper.writeValueAsString(originalReport);
TaskReport deserialized = jsonMapper.readValue(reportJson, TaskReport.class);
Assert.assertTrue(deserialized instanceof TaskContextReport);
TaskContextReport deserializedReport = (TaskContextReport) deserialized;
Assert.assertEquals(originalReport, deserializedReport);
}
@Test
public void testWriteReportMapToFileAndRead() throws Exception
{
IngestionStatsAndErrorsTaskReport report1 = buildTestIngestionReport();
final File reportFile = temporaryFolder.newFile();
final SingleFileTaskReportFileWriter writer = new SingleFileTaskReportFileWriter(reportFile);
writer.setObjectMapper(jsonMapper);
TaskReport.ReportMap reportMap1 = TaskReport.buildTaskReports(report1);
writer.write("testID", reportMap1);
TaskReport.ReportMap reportMap2 = jsonMapper.readValue(reportFile, TaskReport.ReportMap.class);
Assert.assertEquals(reportMap1, reportMap2);
}
@Test
public void testWriteReportMapToStringAndRead() throws Exception
{
IngestionStatsAndErrorsTaskReport ingestionReport = buildTestIngestionReport();
TaskReport.ReportMap reportMap = TaskReport.buildTaskReports(ingestionReport);
String json = jsonMapper.writeValueAsString(reportMap);
TaskReport.ReportMap deserializedReportMap = jsonMapper.readValue(json, TaskReport.ReportMap.class);
Assert.assertEquals(reportMap, deserializedReportMap);
}
@Test
@SuppressWarnings("unchecked")
public void testWritePlainMapAndReadAsReportMap() throws Exception
{
final long now = System.currentTimeMillis();
final List<ParseExceptionReport> buildUnparseableEvents = Arrays.asList(
new ParseExceptionReport("abc,def", "invalid timestamp", Arrays.asList("row: 1", "col: 1"), now),
new ParseExceptionReport("xyz,pqr", "invalid timestamp", Arrays.asList("row: 1", "col: 1"), now)
);
final Map<String, Object> unparseableEvents
= ImmutableMap.of("determinePartitions", Collections.emptyList(), "buildSegments", buildUnparseableEvents);
final Map<String, Object> emptyAverageMinuteMap = ImmutableMap.of(
"processed", 0,
"processedBytes", 0,
"unparseable", 0,
"thrownAway", 0,
"processedWithError", 0
);
final Map<String, Object> emptyAverages = ImmutableMap.of(
"1m", emptyAverageMinuteMap,
"5m", emptyAverageMinuteMap,
"15m", emptyAverageMinuteMap
);
final Map<String, Object> expectedAverages
= ImmutableMap.of("determinePartitions", emptyAverages, "buildSegments", emptyAverages);
final RowIngestionMetersTotals determinePartitionTotalStats
= RowMeters.with().errors(10).unparseable(1).thrownAway(1).bytes(2000).totalProcessed(100);
final RowIngestionMetersTotals buildSegmentTotalStats
= RowMeters.with().errors(5).unparseable(2).thrownAway(1).bytes(2500).totalProcessed(150);
final Map<String, Object> expectedTotals
= ImmutableMap.of("determinePartitions", determinePartitionTotalStats, "buildSegments", buildSegmentTotalStats);
final Map<String, Object> expectedRowStats = ImmutableMap.of(
"movingAverages", expectedAverages,
"totals", expectedTotals
);
final Map<String, Object> expectedPayload = new HashMap<>();
expectedPayload.put("ingestionState", IngestionState.COMPLETED);
expectedPayload.put("unparseableEvents", unparseableEvents);
expectedPayload.put("rowStats", expectedRowStats);
final Map<String, Object> ingestionStatsAndErrors = new HashMap<>();
ingestionStatsAndErrors.put("taskId", "abc");
ingestionStatsAndErrors.put("payload", expectedPayload);
ingestionStatsAndErrors.put("type", "ingestionStatsAndErrors");
final Map<String, Object> expectedReportMap = new HashMap<>();
expectedReportMap.put("ingestionStatsAndErrors", ingestionStatsAndErrors);
final String plainMapJson = jsonMapper.writeValueAsString(expectedReportMap);
// Verify the top-level structure of the report
final TaskReport.ReportMap deserializedReportMap = jsonMapper.readValue(plainMapJson, TaskReport.ReportMap.class);
Optional<IngestionStatsAndErrorsTaskReport> ingestStatsReport = deserializedReportMap.findReport(
"ingestionStatsAndErrors");
Assert.assertTrue(ingestStatsReport.isPresent());
Assert.assertEquals("ingestionStatsAndErrors", ingestStatsReport.get().getReportKey());
Assert.assertEquals("abc", ingestStatsReport.get().getTaskId());
// Verify basic fields in the payload
final IngestionStatsAndErrors observedPayload = ingestStatsReport.get().getPayload();
Assert.assertEquals(expectedPayload.get("ingestionState"), observedPayload.getIngestionState());
Assert.assertNull(observedPayload.getSegmentsRead());
Assert.assertNull(observedPayload.getSegmentsPublished());
Assert.assertNull(observedPayload.getErrorMsg());
Assert.assertNull(observedPayload.getRecordsProcessed());
// Verify stats and unparseable events
final Map<String, Object> observedRowStats = observedPayload.getRowStats();
Assert.assertEquals(expectedAverages, observedRowStats.get("movingAverages"));
final Map<String, Object> observedTotals = (Map<String, Object>) observedRowStats.get("totals");
verifyTotalRowStats(observedTotals, determinePartitionTotalStats, buildSegmentTotalStats);
verifyUnparseableEvents(observedPayload.getUnparseableEvents(), buildUnparseableEvents);
// Re-serialize report map and deserialize as plain map
final String reportMapJson = jsonMapper.writeValueAsString(deserializedReportMap);
final Map<String, Object> deserializedPlainMap = jsonMapper.readValue(
reportMapJson,
JacksonUtils.TYPE_REFERENCE_MAP_STRING_OBJECT
);
final Map<String, Object> ingestStatsReport2
= (Map<String, Object>) deserializedPlainMap.get("ingestionStatsAndErrors");
// Verify basic fields in the payload
final Map<String, Object> observedPayload2 = (Map<String, Object>) ingestStatsReport2.get("payload");
Assert.assertEquals(expectedPayload.get("ingestionState").toString(), observedPayload2.get("ingestionState"));
Assert.assertNull(observedPayload2.get("segmentsRead"));
Assert.assertNull(observedPayload2.get("segmentsPublished"));
Assert.assertNull(observedPayload2.get("errorMsg"));
Assert.assertNull(observedPayload2.get("recordsProcessed"));
// Verify stats and unparseable events
final Map<String, Object> observedRowStats2 = (Map<String, Object>) observedPayload2.get("rowStats");
Assert.assertEquals(expectedAverages, observedRowStats2.get("movingAverages"));
final Map<String, Object> observedTotals2 = (Map<String, Object>) observedRowStats2.get("totals");
verifyTotalRowStats(observedTotals2, determinePartitionTotalStats, buildSegmentTotalStats);
verifyUnparseableEvents(
(Map<String, Object>) observedPayload2.get("unparseableEvents"),
buildUnparseableEvents
);
}
@Test
public void testSerializationOnMissingPartitionStats() throws Exception
{
String json = "{\n"
+ " \"type\": \"ingestionStatsAndErrors\",\n"
+ " \"taskId\": \"ingestionStatsAndErrors\",\n"
+ " \"payload\": {\n"
+ " \"ingestionState\": \"COMPLETED\",\n"
+ " \"unparseableEvents\": {\n"
+ " \"hello\": \"world\"\n"
+ " },\n"
+ " \"rowStats\": {\n"
+ " \"number\": 1234\n"
+ " },\n"
+ " \"errorMsg\": \"an error message\",\n"
+ " \"segmentAvailabilityConfirmed\": true,\n"
+ " \"segmentAvailabilityWaitTimeMs\": 1000\n"
+ " }\n"
+ "}";
IngestionStatsAndErrorsTaskReport expected = new IngestionStatsAndErrorsTaskReport(
IngestionStatsAndErrorsTaskReport.REPORT_KEY,
new IngestionStatsAndErrors(
IngestionState.COMPLETED,
ImmutableMap.of(
"hello", "world"
),
ImmutableMap.of(
"number", 1234
),
"an error message",
true,
1000L,
null,
null,
null
)
);
TaskReport deserialized = jsonMapper.readValue(json, TaskReport.class);
Assert.assertEquals(expected.getTaskId(), deserialized.getTaskId());
Assert.assertEquals(expected, deserialized);
}
@Test
public void testExceptionWhileWritingReport() throws Exception
{
final File reportFile = temporaryFolder.newFile();
final SingleFileTaskReportFileWriter writer = new SingleFileTaskReportFileWriter(reportFile);
writer.setObjectMapper(jsonMapper);
writer.write("theTask", TaskReport.buildTaskReports(new ExceptionalTaskReport()));
// Read the file, ensure it's incomplete and not valid JSON. This allows callers to determine the report was
// not complete when written.
Assert.assertEquals(
"{\"report\":{\"type\":\"exceptional\"",
Files.asCharSource(reportFile, StandardCharsets.UTF_8).read()
);
}
private IngestionStatsAndErrorsTaskReport buildTestIngestionReport()
{
return new IngestionStatsAndErrorsTaskReport(
"testID",
new IngestionStatsAndErrors(
IngestionState.BUILD_SEGMENTS,
Collections.singletonMap("hello", "world"),
Collections.singletonMap("number", 1234),
"an error message",
true,
1000L,
Collections.singletonMap("PartitionA", 5000L),
5L,
10L
)
);
}
private void verifyUnparseableEvents(
Map<String, Object> observed,
List<ParseExceptionReport> buildSegmentUnparseableEvents
)
{
Assert.assertEquals(Collections.emptyList(), observed.get("determinePartitions"));
final List<Object> observedBuildSegmentUnparseableEvents
= (List<Object>) observed.get("buildSegments");
Assert.assertEquals(2, observedBuildSegmentUnparseableEvents.size());
for (int i = 0; i < buildSegmentUnparseableEvents.size(); ++i) {
final ParseExceptionReport expectedEvent = buildSegmentUnparseableEvents.get(i);
final Object observedEvent = observedBuildSegmentUnparseableEvents.get(i);
Assert.assertEquals(
ImmutableMap.of(
"input", expectedEvent.getInput(),
"errorType", expectedEvent.getErrorType(),
"details", expectedEvent.getDetails(),
"timeOfExceptionMillis", expectedEvent.getTimeOfExceptionMillis()
),
observedEvent
);
}
}
private void verifyTotalRowStats(
Map<String, Object> observedTotals,
RowIngestionMetersTotals determinePartitionTotalStats,
RowIngestionMetersTotals buildSegmentTotalStats
)
{
Assert.assertEquals(
ImmutableMap.of(
"processed", (int) determinePartitionTotalStats.getProcessed(),
"processedBytes", (int) determinePartitionTotalStats.getProcessedBytes(),
"processedWithError", (int) determinePartitionTotalStats.getProcessedWithError(),
"thrownAway", (int) determinePartitionTotalStats.getThrownAway(),
"unparseable", (int) determinePartitionTotalStats.getUnparseable()
),
observedTotals.get("determinePartitions")
);
Assert.assertEquals(
ImmutableMap.of(
"processed", (int) buildSegmentTotalStats.getProcessed(),
"processedBytes", (int) buildSegmentTotalStats.getProcessedBytes(),
"processedWithError", (int) buildSegmentTotalStats.getProcessedWithError(),
"thrownAway", (int) buildSegmentTotalStats.getThrownAway(),
"unparseable", (int) buildSegmentTotalStats.getUnparseable()
),
observedTotals.get("buildSegments")
);
}
/**
* Task report that throws an exception while being serialized.
*/
@JsonTypeName("exceptional")
private static class ExceptionalTaskReport implements TaskReport
{
@Override
@JsonProperty
public String getTaskId()
{
throw new UnsupportedOperationException("cannot serialize task ID");
}
@Override
public String getReportKey()
{
return "report";
}
@Override
@JsonProperty
public Object getPayload()
{
throw new UnsupportedOperationException("cannot serialize payload");
}
}
}

View File

@ -30,6 +30,7 @@ import org.apache.druid.common.guava.FutureUtils;
import org.apache.druid.common.utils.IdUtils;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexer.TaskStatusPlus;
import org.apache.druid.indexer.report.TaskReport;
import org.apache.druid.indexing.overlord.supervisor.SupervisorStatus;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.apache.druid.metadata.LockFilterPolicy;
@ -166,7 +167,7 @@ public interface OverlordClient
* Returns a {@link org.apache.druid.rpc.HttpResponseException} with code
* {@link javax.ws.rs.core.Response.Status#NOT_FOUND} if there is no report available for some reason.
*/
ListenableFuture<Map<String, Object>> taskReportAsMap(String taskId);
ListenableFuture<TaskReport.ReportMap> taskReportAsMap(String taskId);
/**
* Returns the payload for a task as an instance of {@link ClientTaskQuery}. This method only works for tasks

View File

@ -32,6 +32,7 @@ import org.apache.druid.client.indexing.TaskStatusResponse;
import org.apache.druid.common.guava.FutureUtils;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexer.TaskStatusPlus;
import org.apache.druid.indexer.report.TaskReport;
import org.apache.druid.indexing.overlord.supervisor.SupervisorStatus;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.jackson.JacksonUtils;
@ -214,7 +215,7 @@ public class OverlordClientImpl implements OverlordClient
}
@Override
public ListenableFuture<Map<String, Object>> taskReportAsMap(String taskId)
public ListenableFuture<TaskReport.ReportMap> taskReportAsMap(String taskId)
{
final String path = StringUtils.format("/druid/indexer/v1/task/%s/reports", StringUtils.urlEncode(taskId));
@ -223,7 +224,7 @@ public class OverlordClientImpl implements OverlordClient
new RequestBuilder(HttpMethod.GET, path),
new BytesFullResponseHandler()
),
holder -> JacksonUtils.readValue(jsonMapper, holder.getContent(), JacksonUtils.TYPE_REFERENCE_MAP_STRING_OBJECT)
holder -> JacksonUtils.readValue(jsonMapper, holder.getContent(), TaskReport.ReportMap.class)
);
}

View File

@ -22,6 +22,7 @@ package org.apache.druid.client.indexing;
import com.google.common.util.concurrent.ListenableFuture;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexer.TaskStatusPlus;
import org.apache.druid.indexer.report.TaskReport;
import org.apache.druid.indexing.overlord.supervisor.SupervisorStatus;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.apache.druid.metadata.LockFilterPolicy;
@ -84,7 +85,7 @@ public class NoopOverlordClient implements OverlordClient
}
@Override
public ListenableFuture<Map<String, Object>> taskReportAsMap(String taskId)
public ListenableFuture<TaskReport.ReportMap> taskReportAsMap(String taskId)
{
throw new UnsupportedOperationException();
}

View File

@ -35,6 +35,8 @@ import org.apache.druid.indexer.RunnerTaskState;
import org.apache.druid.indexer.TaskLocation;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.TaskStatusPlus;
import org.apache.druid.indexer.report.KillTaskReport;
import org.apache.druid.indexer.report.TaskReport;
import org.apache.druid.indexing.overlord.supervisor.SupervisorStatus;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.DateTimes;
@ -289,7 +291,7 @@ public class OverlordClientImplTest
public void test_taskReportAsMap() throws Exception
{
final String taskId = "testTaskId";
final Map<String, Object> response = ImmutableMap.of("test", "value");
final TaskReport.ReportMap response = TaskReport.buildTaskReports(new KillTaskReport("taskId", null));
serviceClient.expectAndRespond(
new RequestBuilder(HttpMethod.GET, "/druid/indexer/v1/task/testTaskId/reports"),
@ -298,7 +300,7 @@ public class OverlordClientImplTest
jsonMapper.writeValueAsBytes(response)
);
final ListenableFuture<Map<String, Object>> future = overlordClient.taskReportAsMap(taskId);
final ListenableFuture<TaskReport.ReportMap> future = overlordClient.taskReportAsMap(taskId);
Assert.assertEquals(response, future.get());
}
@ -319,7 +321,7 @@ public class OverlordClientImplTest
)
);
final ListenableFuture<Map<String, Object>> future = overlordClient.taskReportAsMap(taskId);
final ListenableFuture<TaskReport.ReportMap> future = overlordClient.taskReportAsMap(taskId);
final ExecutionException e = Assert.assertThrows(
ExecutionException.class,
@ -345,7 +347,7 @@ public class OverlordClientImplTest
StringUtils.toUtf8("{}")
);
final Map<String, Object> actualResponse =
final TaskReport.ReportMap actualResponse =
FutureUtils.getUnchecked(overlordClient.taskReportAsMap(taskID), true);
Assert.assertEquals(Collections.emptyMap(), actualResponse);
}

View File

@ -56,8 +56,8 @@ import org.apache.druid.guice.annotations.AttemptId;
import org.apache.druid.guice.annotations.Parent;
import org.apache.druid.guice.annotations.RemoteChatHandler;
import org.apache.druid.guice.annotations.Self;
import org.apache.druid.indexer.report.TaskReportFileWriter;
import org.apache.druid.indexing.common.MultipleFileTaskReportFileWriter;
import org.apache.druid.indexing.common.TaskReportFileWriter;
import org.apache.druid.indexing.overlord.TaskRunner;
import org.apache.druid.indexing.overlord.ThreadingTaskRunner;
import org.apache.druid.indexing.worker.Worker;

View File

@ -71,10 +71,10 @@ import org.apache.druid.guice.annotations.AttemptId;
import org.apache.druid.guice.annotations.Json;
import org.apache.druid.guice.annotations.Parent;
import org.apache.druid.guice.annotations.Self;
import org.apache.druid.indexer.report.SingleFileTaskReportFileWriter;
import org.apache.druid.indexer.report.TaskReportFileWriter;
import org.apache.druid.indexing.common.RetryPolicyConfig;
import org.apache.druid.indexing.common.RetryPolicyFactory;
import org.apache.druid.indexing.common.SingleFileTaskReportFileWriter;
import org.apache.druid.indexing.common.TaskReportFileWriter;
import org.apache.druid.indexing.common.TaskToolboxFactory;
import org.apache.druid.indexing.common.actions.LocalTaskActionClientFactory;
import org.apache.druid.indexing.common.actions.RemoteTaskActionClientFactory;