Added live reports for Kafka and Native batch task (#8557)

* Added live reports for Kafka and Native batch task

* Removed unused local variables

* Added the missing unit test

* Refine unit test logic, add implementation for HttpRemoteTaskRunner

* checksytle fixes

* Update doc descriptions for updated API

* remove unnecessary files

* Fix spellcheck complaints

* More details for api descriptions
This commit is contained in:
Rye 2019-09-23 21:08:36 -07:00 committed by Jonathan Wei
parent 52f3f2c229
commit f2a444321b
8 changed files with 350 additions and 45 deletions

View File

@ -43,17 +43,19 @@ the Overlord APIs.
## Task reports
A report containing information about the number of rows ingested, and any parse exceptions that occurred is available for both completed tasks and running tasks.
The reporting feature is supported by the [simple native batch task](../ingestion/native-batch.md#simple-task), the Hadoop batch task, and Kafka and Kinesis ingestion tasks.
### Completion report
After a task completes, a report containing information about the number of rows ingested and any parse exceptions that occurred is available at:
After a task completes, a completion report can be retrieved at:
```
http://<OVERLORD-HOST>:<OVERLORD-PORT>/druid/indexer/v1/task/<task-id>/reports
```
The reporting feature is supported by the [simple native batch task](../ingestion/native-batch.md#simple-task), the Hadoop batch task, and Kafka and Kinesis ingestion tasks.
An example output is shown below, along with a description of the fields:
An example output is shown below:
```json
{
@ -83,6 +85,70 @@ An example output is shown below, along with a description of the fields:
}
```
### Live report
When a task is running, a live report containing ingestion state, unparseable events and moving average for number of events processed for 1 min, 5 min, 15 min time window can be retrieved at:
```
http://<OVERLORD-HOST>:<OVERLORD-PORT>/druid/indexer/v1/task/<task-id>/reports
```
and
```
http://<middlemanager-host>:<worker-port>/druid/worker/v1/chat/<task-id>/liveReports
```
An example output is shown below:
```json
{
"ingestionStatsAndErrors": {
"taskId": "compact_twitter_2018-09-24T18:24:23.920Z",
"payload": {
"ingestionState": "RUNNING",
"unparseableEvents": {},
"rowStats": {
"movingAverages": {
"buildSegments": {
"5m": {
"processed": 3.392158326408501,
"unparseable": 0,
"thrownAway": 0,
"processedWithError": 0
},
"15m": {
"processed": 1.736165476881023,
"unparseable": 0,
"thrownAway": 0,
"processedWithError": 0
},
"1m": {
"processed": 4.206417693750045,
"unparseable": 0,
"thrownAway": 0,
"processedWithError": 0
}
}
},
"totals": {
"buildSegments": {
"processed": 1994,
"processedWithError": 0,
"thrownAway": 0,
"unparseable": 0
}
}
},
"errorMsg": null
},
"type": "ingestionStatsAndErrors"
}
}
```
A description of the fields:
The `ingestionStatsAndErrors` report provides information about row counts and errors.
The `ingestionState` shows what step of ingestion the task reached. Possible states include:

View File

@ -65,6 +65,7 @@ import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.common.config.TaskStorageConfig;
import org.apache.druid.indexing.common.stats.RowIngestionMeters;
import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory;
import org.apache.druid.indexing.common.stats.RowIngestionMetersTotals;
import org.apache.druid.indexing.common.task.IndexTaskTest;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.common.task.Tasks;
@ -79,6 +80,7 @@ import org.apache.druid.indexing.overlord.TaskLockbox;
import org.apache.druid.indexing.overlord.TaskStorage;
import org.apache.druid.indexing.overlord.supervisor.SupervisorManager;
import org.apache.druid.indexing.seekablestream.SeekableStreamEndSequenceNumbers;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner.Status;
import org.apache.druid.indexing.seekablestream.SeekableStreamStartSequenceNumbers;
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor;
@ -504,6 +506,57 @@ public class KafkaIndexTaskTest
Assert.assertEquals(ImmutableList.of("d", "e"), readSegmentColumn("dim1", publishedDescriptors.get(1)));
}
@Test(timeout = 60_000L)
public void testRunAfterDataInsertedLiveReport() throws Exception
{
// Insert data
insertData();
final KafkaIndexTask task = createTask(
null,
new KafkaIndexTaskIOConfig(
0,
"sequence0",
new SeekableStreamStartSequenceNumbers<>(topic, ImmutableMap.of(0, 2L), ImmutableSet.of()),
new SeekableStreamEndSequenceNumbers<>(topic, ImmutableMap.of(0, 12L)),
kafkaServer.consumerProperties(),
KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
true,
null,
null
)
);
final ListenableFuture<TaskStatus> future = runTask(task);
SeekableStreamIndexTaskRunner runner = task.getRunner();
while (true) {
Thread.sleep(1000);
if (runner.getStatus() == Status.PUBLISHING) {
break;
}
}
Map rowStats = runner.doGetRowStats();
Map totals = (Map) rowStats.get("totals");
RowIngestionMetersTotals buildSegments = (RowIngestionMetersTotals) totals.get("buildSegments");
Map movingAverages = (Map) rowStats.get("movingAverages");
Map buildSegments2 = (Map) movingAverages.get("buildSegments");
HashMap avg_1min = (HashMap) buildSegments2.get("1m");
HashMap avg_5min = (HashMap) buildSegments2.get("5m");
HashMap avg_15min = (HashMap) buildSegments2.get("15m");
runner.resume();
// Check metrics
Assert.assertEquals(buildSegments.getProcessed(), task.getRunner().getRowIngestionMeters().getProcessed());
Assert.assertEquals(buildSegments.getUnparseable(), task.getRunner().getRowIngestionMeters().getUnparseable());
Assert.assertEquals(buildSegments.getThrownAway(), task.getRunner().getRowIngestionMeters().getThrownAway());
Assert.assertEquals(avg_1min.get("processed"), 0.0);
Assert.assertEquals(avg_5min.get("processed"), 0.0);
Assert.assertEquals(avg_15min.get("processed"), 0.0);
// Wait for task to exit
Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode());
}
@Test(timeout = 60_000L)
public void testIncrementalHandOff() throws Exception
{
@ -584,7 +637,10 @@ public class KafkaIndexTaskTest
SegmentDescriptor desc5 = sd("2011/P1D", 1);
SegmentDescriptor desc6 = sd("2012/P1D", 0);
SegmentDescriptor desc7 = sd("2013/P1D", 0);
assertEqualsExceptVersion(ImmutableList.of(desc1, desc2, desc3, desc4, desc5, desc6, desc7), publishedDescriptors());
assertEqualsExceptVersion(
ImmutableList.of(desc1, desc2, desc3, desc4, desc5, desc6, desc7),
publishedDescriptors()
);
Assert.assertEquals(
new KafkaDataSourceMetadata(
new SeekableStreamEndSequenceNumbers<>(topic, ImmutableMap.of(0, 10L, 1, 2L))
@ -725,7 +781,10 @@ public class KafkaIndexTaskTest
SegmentDescriptor desc5 = sd("2011/P1D", 1);
SegmentDescriptor desc6 = sd("2012/P1D", 0);
SegmentDescriptor desc7 = sd("2013/P1D", 0);
assertEqualsExceptVersion(ImmutableList.of(desc1, desc2, desc3, desc4, desc5, desc6, desc7), publishedDescriptors());
assertEqualsExceptVersion(
ImmutableList.of(desc1, desc2, desc3, desc4, desc5, desc6, desc7),
publishedDescriptors()
);
Assert.assertEquals(
new KafkaDataSourceMetadata(
new SeekableStreamEndSequenceNumbers<>(topic, ImmutableMap.of(0, 10L, 1, 2L))
@ -733,7 +792,10 @@ public class KafkaIndexTaskTest
metadataStorageCoordinator.getDataSourceMetadata(DATA_SCHEMA.getDataSource())
);
assertEqualsExceptVersion(ImmutableList.of(desc1, desc2, desc3, desc4, desc5, desc6, desc7), publishedDescriptors());
assertEqualsExceptVersion(
ImmutableList.of(desc1, desc2, desc3, desc4, desc5, desc6, desc7),
publishedDescriptors()
);
Assert.assertEquals(
new KafkaDataSourceMetadata(
new SeekableStreamEndSequenceNumbers<>(topic, ImmutableMap.of(0, 10L, 1, 2L))
@ -1957,7 +2019,10 @@ public class KafkaIndexTaskTest
SegmentDescriptor desc5 = sd("2010/P1D", 0);
SegmentDescriptor desc6 = sd("2011/P1D", 0);
SegmentDescriptor desc7 = sd("2012/P1D", 0);
assertEqualsExceptVersion(ImmutableList.of(desc1, desc2, desc3, desc4, desc5, desc6, desc7), publishedDescriptors());
assertEqualsExceptVersion(
ImmutableList.of(desc1, desc2, desc3, desc4, desc5, desc6, desc7),
publishedDescriptors()
);
Assert.assertEquals(
new KafkaDataSourceMetadata(new SeekableStreamEndSequenceNumbers<>(topic, ImmutableMap.of(0, 10L))),
metadataStorageCoordinator.getDataSourceMetadata(DATA_SCHEMA.getDataSource())

View File

@ -339,15 +339,8 @@ public class IndexTask extends AbstractBatchIndexTask implements ChatHandler
return Response.ok(events).build();
}
@GET
@Path("/rowStats")
@Produces(MediaType.APPLICATION_JSON)
public Response getRowStats(
@Context final HttpServletRequest req,
@QueryParam("full") String full
)
private Map<String, Object> doGetRowStats(String full)
{
IndexTaskUtils.datasourceAuthorizationCheck(req, Action.READ, getDataSource(), authorizerMapper);
Map<String, Object> returnMap = new HashMap<>();
Map<String, Object> totalsMap = new HashMap<>();
Map<String, Object> averagesMap = new HashMap<>();
@ -396,6 +389,44 @@ public class IndexTask extends AbstractBatchIndexTask implements ChatHandler
returnMap.put("totals", totalsMap);
returnMap.put("movingAverages", averagesMap);
return returnMap;
}
@GET
@Path("/rowStats")
@Produces(MediaType.APPLICATION_JSON)
public Response getRowStats(
@Context final HttpServletRequest req,
@QueryParam("full") String full
)
{
IndexTaskUtils.datasourceAuthorizationCheck(req, Action.READ, getDataSource(), authorizerMapper);
return Response.ok(doGetRowStats(full)).build();
}
@GET
@Path("/liveReports")
@Produces(MediaType.APPLICATION_JSON)
public Response getLiveReports(
@Context final HttpServletRequest req,
@QueryParam("full") String full
)
{
IndexTaskUtils.datasourceAuthorizationCheck(req, Action.READ, getDataSource(), authorizerMapper);
Map<String, Object> returnMap = new HashMap<>();
Map<String, Object> ingestionStatsAndErrors = new HashMap<>();
Map<String, Object> payload = new HashMap<>();
Map<String, Object> events = getTaskCompletionUnparseableEvents();
payload.put("ingestionState", ingestionState);
payload.put("unparseableEvents", events);
payload.put("rowStats", doGetRowStats(full));
ingestionStatsAndErrors.put("taskId", getId());
ingestionStatsAndErrors.put("payload", payload);
ingestionStatsAndErrors.put("type", "ingestionStatsAndErrors");
returnMap.put("ingestionStatsAndErrors", ingestionStatsAndErrors);
return Response.ok(returnMap).build();
}
@ -462,7 +493,6 @@ public class IndexTask extends AbstractBatchIndexTask implements ChatHandler
} else {
dataSchema = ingestionSchema.getDataSchema();
}
ingestionState = IngestionState.BUILD_SEGMENTS;
return generateAndPublishSegments(
toolbox,
@ -957,7 +987,6 @@ public class IndexTask extends AbstractBatchIndexTask implements ChatHandler
*
* @param interval interval for shardSpec
* @param row input row
*
* @return a shardSpec
*/
ShardSpec getShardSpec(Interval interval, InputRow row)

View File

@ -50,4 +50,15 @@ public class TaskRunnerTaskLogStreamer implements TaskLogStreamer
return Optional.absent();
}
}
@Override
public Optional<ByteSource> streamTaskReports(String taskId) throws IOException
{
final TaskRunner runner = taskMaster.getTaskRunner().orNull();
if (runner instanceof TaskLogStreamer) {
return ((TaskLogStreamer) runner).streamTaskReports(taskId);
} else {
return Optional.absent();
}
}
}

View File

@ -638,6 +638,47 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
}
}
@Override
public Optional<ByteSource> streamTaskReports(final String taskId)
{
final ZkWorker zkWorker = findWorkerRunningTask(taskId);
if (zkWorker == null) {
// Worker is not running this task, it might be available in deep storage
return Optional.absent();
} else {
TaskLocation taskLocation = runningTasks.get(taskId).getLocation();
final URL url = TaskRunnerUtils.makeTaskLocationURL(
taskLocation,
"/druid/worker/v1/chat/%s/liveReports",
taskId
);
return Optional.of(
new ByteSource()
{
@Override
public InputStream openStream() throws IOException
{
try {
return httpClient.go(
new Request(HttpMethod.GET, url),
new InputStreamResponseHandler()
).get();
}
catch (InterruptedException e) {
throw new RuntimeException(e);
}
catch (ExecutionException e) {
// Unwrap if possible
Throwables.propagateIfPossible(e.getCause(), IOException.class);
throw new RuntimeException(e);
}
}
}
);
}
}
/**
* Adds a task to the pending queue
*/
@ -753,7 +794,6 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
* needs to bootstrap after a restart.
*
* @param taskRunnerWorkItem - the task to assign
*
* @return true iff the task is now assigned
*/
private boolean tryAssignTask(final Task task, final RemoteTaskRunnerWorkItem taskRunnerWorkItem) throws Exception
@ -787,8 +827,8 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
Maps.filterEntries(
zkWorkers,
input -> !lazyWorkers.containsKey(input.getKey()) &&
!workersWithUnacknowledgedTask.containsKey(input.getKey()) &&
!blackListedWorkers.contains(input.getValue())
!workersWithUnacknowledgedTask.containsKey(input.getKey()) &&
!blackListedWorkers.contains(input.getValue())
),
(String key, ZkWorker value) -> value.toImmutable()
)
@ -832,7 +872,6 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
*
* @param theZkWorker The worker the task is assigned to
* @param taskRunnerWorkItem The task to be assigned
*
* @return boolean indicating whether the task was successfully assigned or not
*/
private boolean announceTask(
@ -911,7 +950,6 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
* The RemoteTaskRunner updates state according to these changes.
*
* @param worker contains metadata for a worker that has appeared in ZK
*
* @return future that will contain a fully initialized worker
*/
private ListenableFuture<ZkWorker> addWorker(final Worker worker)

View File

@ -113,4 +113,25 @@ public class TaskRunnerUtils
throw new RuntimeException(e);
}
}
public static URL makeTaskLocationURL(TaskLocation taskLocation, String pathFormat, String... pathParams)
{
Preconditions.checkArgument(pathFormat.startsWith("/"), "path must start with '/': %s", pathFormat);
final String path = StringUtils.format(
pathFormat,
Arrays.stream(pathParams).map(StringUtils::urlEncode).toArray()
);
try {
return new URI(StringUtils.format(
"http://%s:%s%s",
taskLocation.getHost(),
taskLocation.getPort(),
path
)).toURL();
}
catch (URISyntaxException | MalformedURLException e) {
throw new RuntimeException(e);
}
}
}

View File

@ -331,8 +331,8 @@ public class HttpRemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
Maps.filterEntries(
workers,
input -> !lazyWorkers.containsKey(input.getKey()) &&
!workersWithUnacknowledgedTask.containsKey(input.getKey()) &&
!blackListedWorkers.containsKey(input.getKey())
!workersWithUnacknowledgedTask.containsKey(input.getKey()) &&
!blackListedWorkers.containsKey(input.getKey())
),
(String key, WorkerHolder value) -> value.toImmutable()
)
@ -902,6 +902,52 @@ public class HttpRemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
}
}
@Override
public Optional<ByteSource> streamTaskReports(String taskId)
{
HttpRemoteTaskRunnerWorkItem taskRunnerWorkItem = tasks.get(taskId);
Worker worker = null;
if (taskRunnerWorkItem != null && taskRunnerWorkItem.getState() != HttpRemoteTaskRunnerWorkItem.State.COMPLETE) {
worker = taskRunnerWorkItem.getWorker();
}
if (worker == null || !workers.containsKey(worker.getHost())) {
// Worker is not running this task, it might be available in deep storage
return Optional.absent();
} else {
// Worker is still running this task
TaskLocation taskLocation = taskRunnerWorkItem.getLocation();
final URL url = TaskRunnerUtils.makeTaskLocationURL(
taskLocation,
"/druid/worker/v1/chat/%s/liveReports",
taskId
);
return Optional.of(
new ByteSource()
{
@Override
public InputStream openStream() throws IOException
{
try {
return httpClient.go(
new Request(HttpMethod.GET, url),
new InputStreamResponseHandler()
).get();
}
catch (InterruptedException e) {
throw new RuntimeException(e);
}
catch (ExecutionException e) {
// Unwrap if possible
Throwables.propagateIfPossible(e.getCause(), IOException.class);
throw new RuntimeException(e);
}
}
}
);
}
}
@Override
public List<Pair<Task, ListenableFuture<TaskStatus>>> restore()
{

View File

@ -713,8 +713,8 @@ public abstract class SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
if (sequenceToCheckpoint != null && stillReading) {
Preconditions.checkArgument(
getLastSequenceMetadata()
.getSequenceName()
.equals(sequenceToCheckpoint.getSequenceName()),
.getSequenceName()
.equals(sequenceToCheckpoint.getSequenceName()),
"Cannot checkpoint a sequence [%s] which is not the latest one, sequences %s",
sequenceToCheckpoint,
sequences
@ -1488,14 +1488,8 @@ public abstract class SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
return setEndOffsets(sequences, finish);
}
@GET
@Path("/rowStats")
@Produces(MediaType.APPLICATION_JSON)
public Response getRowStats(
@Context final HttpServletRequest req
)
public Map<String, Object> doGetRowStats()
{
authorizationCheck(req, Action.READ);
Map<String, Object> returnMap = new HashMap<>();
Map<String, Object> totalsMap = new HashMap<>();
Map<String, Object> averagesMap = new HashMap<>();
@ -1511,9 +1505,51 @@ public abstract class SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
returnMap.put("movingAverages", averagesMap);
returnMap.put("totals", totalsMap);
return Response.ok(returnMap).build();
return returnMap;
}
public Map<String, Object> doGetLiveReports()
{
Map<String, Object> returnMap = new HashMap<>();
Map<String, Object> ingestionStatsAndErrors = new HashMap<>();
Map<String, Object> payload = new HashMap<>();
Map<String, Object> events = getTaskCompletionUnparseableEvents();
payload.put("ingestionState", ingestionState);
payload.put("unparseableEvents", events);
payload.put("rowStats", doGetRowStats());
ingestionStatsAndErrors.put("taskId", task.getId());
ingestionStatsAndErrors.put("payload", payload);
ingestionStatsAndErrors.put("type", "ingestionStatsAndErrors");
returnMap.put("ingestionStatsAndErrors", ingestionStatsAndErrors);
return returnMap;
}
@GET
@Path("/rowStats")
@Produces(MediaType.APPLICATION_JSON)
public Response getRowStats(
@Context final HttpServletRequest req
)
{
authorizationCheck(req, Action.READ);
return Response.ok(doGetRowStats()).build();
}
@GET
@Path("/liveReports")
@Produces(MediaType.APPLICATION_JSON)
public Response getLiveReport(
@Context final HttpServletRequest req
)
{
authorizationCheck(req, Action.READ);
return Response.ok(doGetLiveReports()).build();
}
@GET
@Path("/unparseableEvents")
@Produces(MediaType.APPLICATION_JSON)
@ -1586,7 +1622,8 @@ public abstract class SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
}
for (Map.Entry<PartitionIdType, SequenceOffsetType> entry : sequenceNumbers.entrySet()) {
if (createSequenceNumber(entry.getValue()).compareTo(createSequenceNumber(currOffsets.get(entry.getKey()))) < 0) {
if (createSequenceNumber(entry.getValue()).compareTo(createSequenceNumber(currOffsets.get(entry.getKey())))
< 0) {
return Response.status(Response.Status.BAD_REQUEST)
.entity(
StringUtils.format(
@ -1778,7 +1815,7 @@ public abstract class SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
/**
* This method does two things:
*
* <p>
* 1) Verifies that the sequence numbers we read are at least as high as those read previously, and throws an
* exception if not.
* 2) Returns false if we should skip this record because it's either (a) the first record in a partition that we are
@ -1829,9 +1866,7 @@ public abstract class SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
*
* @param toolbox task toolbox
* @param checkpointsString the json-serialized checkpoint string
*
* @return checkpoint
*
* @throws IOException jsonProcessingException
*/
@Nullable
@ -1845,7 +1880,6 @@ public abstract class SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
* This is what would become the start offsets of the next reader, if we stopped reading now.
*
* @param sequenceNumber the sequence number that has already been processed
*
* @return next sequence number to be stored
*/
protected abstract SequenceOffsetType getNextStartOffset(SequenceOffsetType sequenceNumber);
@ -1855,7 +1889,6 @@ public abstract class SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
*
* @param mapper json objectMapper
* @param object metadata
*
* @return SeekableStreamEndSequenceNumbers
*/
protected abstract SeekableStreamEndSequenceNumbers<PartitionIdType, SequenceOffsetType> deserializePartitionsFromMetadata(
@ -1869,9 +1902,7 @@ public abstract class SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
*
* @param recordSupplier
* @param toolbox
*
* @return list of records polled, can be empty but cannot be null
*
* @throws Exception
*/
@NotNull
@ -1884,7 +1915,6 @@ public abstract class SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
* creates specific implementations of kafka/kinesis datasource metadata
*
* @param partitions partitions used to create the datasource metadata
*
* @return datasource metadata
*/
protected abstract SeekableStreamDataSourceMetadata<PartitionIdType, SequenceOffsetType> createDataSourceMetadata(
@ -1895,7 +1925,6 @@ public abstract class SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
* create a specific implementation of Kafka/Kinesis sequence number/offset used for comparison mostly
*
* @param sequenceNumber
*
* @return a specific OrderedSequenceNumber instance for Kafka/Kinesis
*/
protected abstract OrderedSequenceNumber<SequenceOffsetType> createSequenceNumber(SequenceOffsetType sequenceNumber);