From f2a444321b6d356eaaff7954bc63988f885d0fc3 Mon Sep 17 00:00:00 2001 From: Rye Date: Mon, 23 Sep 2019 21:08:36 -0700 Subject: [PATCH] Added live reports for Kafka and Native batch task (#8557) * Added live reports for Kafka and Native batch task * Removed unused local variables * Added the missing unit test * Refine unit test logic, add implementation for HttpRemoteTaskRunner * checksytle fixes * Update doc descriptions for updated API * remove unnecessary files * Fix spellcheck complaints * More details for api descriptions --- docs/ingestion/tasks.md | 74 ++++++++++++++++++- .../indexing/kafka/KafkaIndexTaskTest.java | 73 +++++++++++++++++- .../druid/indexing/common/task/IndexTask.java | 49 +++++++++--- .../tasklogs/TaskRunnerTaskLogStreamer.java | 11 +++ .../indexing/overlord/RemoteTaskRunner.java | 48 ++++++++++-- .../indexing/overlord/TaskRunnerUtils.java | 21 ++++++ .../overlord/hrtr/HttpRemoteTaskRunner.java | 50 ++++++++++++- .../SeekableStreamIndexTaskRunner.java | 69 ++++++++++++----- 8 files changed, 350 insertions(+), 45 deletions(-) diff --git a/docs/ingestion/tasks.md b/docs/ingestion/tasks.md index e51ed7f68be..d658d0816a9 100644 --- a/docs/ingestion/tasks.md +++ b/docs/ingestion/tasks.md @@ -43,17 +43,19 @@ the Overlord APIs. ## Task reports +A report containing information about the number of rows ingested, and any parse exceptions that occurred is available for both completed tasks and running tasks. + +The reporting feature is supported by the [simple native batch task](../ingestion/native-batch.md#simple-task), the Hadoop batch task, and Kafka and Kinesis ingestion tasks. + ### Completion report -After a task completes, a report containing information about the number of rows ingested and any parse exceptions that occurred is available at: +After a task completes, a completion report can be retrieved at: ``` http://:/druid/indexer/v1/task//reports ``` -The reporting feature is supported by the [simple native batch task](../ingestion/native-batch.md#simple-task), the Hadoop batch task, and Kafka and Kinesis ingestion tasks. - -An example output is shown below, along with a description of the fields: +An example output is shown below: ```json { @@ -83,6 +85,70 @@ An example output is shown below, along with a description of the fields: } ``` +### Live report + +When a task is running, a live report containing ingestion state, unparseable events and moving average for number of events processed for 1 min, 5 min, 15 min time window can be retrieved at: + +``` +http://:/druid/indexer/v1/task//reports +``` + +and + +``` +http://:/druid/worker/v1/chat//liveReports +``` + +An example output is shown below: + +```json +{ + "ingestionStatsAndErrors": { + "taskId": "compact_twitter_2018-09-24T18:24:23.920Z", + "payload": { + "ingestionState": "RUNNING", + "unparseableEvents": {}, + "rowStats": { + "movingAverages": { + "buildSegments": { + "5m": { + "processed": 3.392158326408501, + "unparseable": 0, + "thrownAway": 0, + "processedWithError": 0 + }, + "15m": { + "processed": 1.736165476881023, + "unparseable": 0, + "thrownAway": 0, + "processedWithError": 0 + }, + "1m": { + "processed": 4.206417693750045, + "unparseable": 0, + "thrownAway": 0, + "processedWithError": 0 + } + } + }, + "totals": { + "buildSegments": { + "processed": 1994, + "processedWithError": 0, + "thrownAway": 0, + "unparseable": 0 + } + } + }, + "errorMsg": null + }, + "type": "ingestionStatsAndErrors" + } +} +``` + +A description of the fields: + The `ingestionStatsAndErrors` report provides information about row counts and errors. The `ingestionState` shows what step of ingestion the task reached. Possible states include: diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java index 89062b31b8f..51cda7d24b1 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java @@ -65,6 +65,7 @@ import org.apache.druid.indexing.common.config.TaskConfig; import org.apache.druid.indexing.common.config.TaskStorageConfig; import org.apache.druid.indexing.common.stats.RowIngestionMeters; import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory; +import org.apache.druid.indexing.common.stats.RowIngestionMetersTotals; import org.apache.druid.indexing.common.task.IndexTaskTest; import org.apache.druid.indexing.common.task.Task; import org.apache.druid.indexing.common.task.Tasks; @@ -79,6 +80,7 @@ import org.apache.druid.indexing.overlord.TaskLockbox; import org.apache.druid.indexing.overlord.TaskStorage; import org.apache.druid.indexing.overlord.supervisor.SupervisorManager; import org.apache.druid.indexing.seekablestream.SeekableStreamEndSequenceNumbers; +import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner; import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner.Status; import org.apache.druid.indexing.seekablestream.SeekableStreamStartSequenceNumbers; import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor; @@ -504,6 +506,57 @@ public class KafkaIndexTaskTest Assert.assertEquals(ImmutableList.of("d", "e"), readSegmentColumn("dim1", publishedDescriptors.get(1))); } + @Test(timeout = 60_000L) + public void testRunAfterDataInsertedLiveReport() throws Exception + { + // Insert data + insertData(); + final KafkaIndexTask task = createTask( + null, + new KafkaIndexTaskIOConfig( + 0, + "sequence0", + new SeekableStreamStartSequenceNumbers<>(topic, ImmutableMap.of(0, 2L), ImmutableSet.of()), + new SeekableStreamEndSequenceNumbers<>(topic, ImmutableMap.of(0, 12L)), + kafkaServer.consumerProperties(), + KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS, + true, + null, + null + ) + ); + final ListenableFuture future = runTask(task); + SeekableStreamIndexTaskRunner runner = task.getRunner(); + while (true) { + Thread.sleep(1000); + if (runner.getStatus() == Status.PUBLISHING) { + break; + } + } + Map rowStats = runner.doGetRowStats(); + Map totals = (Map) rowStats.get("totals"); + RowIngestionMetersTotals buildSegments = (RowIngestionMetersTotals) totals.get("buildSegments"); + + Map movingAverages = (Map) rowStats.get("movingAverages"); + Map buildSegments2 = (Map) movingAverages.get("buildSegments"); + HashMap avg_1min = (HashMap) buildSegments2.get("1m"); + HashMap avg_5min = (HashMap) buildSegments2.get("5m"); + HashMap avg_15min = (HashMap) buildSegments2.get("15m"); + + runner.resume(); + + // Check metrics + Assert.assertEquals(buildSegments.getProcessed(), task.getRunner().getRowIngestionMeters().getProcessed()); + Assert.assertEquals(buildSegments.getUnparseable(), task.getRunner().getRowIngestionMeters().getUnparseable()); + Assert.assertEquals(buildSegments.getThrownAway(), task.getRunner().getRowIngestionMeters().getThrownAway()); + + Assert.assertEquals(avg_1min.get("processed"), 0.0); + Assert.assertEquals(avg_5min.get("processed"), 0.0); + Assert.assertEquals(avg_15min.get("processed"), 0.0); + // Wait for task to exit + Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode()); + } + @Test(timeout = 60_000L) public void testIncrementalHandOff() throws Exception { @@ -584,7 +637,10 @@ public class KafkaIndexTaskTest SegmentDescriptor desc5 = sd("2011/P1D", 1); SegmentDescriptor desc6 = sd("2012/P1D", 0); SegmentDescriptor desc7 = sd("2013/P1D", 0); - assertEqualsExceptVersion(ImmutableList.of(desc1, desc2, desc3, desc4, desc5, desc6, desc7), publishedDescriptors()); + assertEqualsExceptVersion( + ImmutableList.of(desc1, desc2, desc3, desc4, desc5, desc6, desc7), + publishedDescriptors() + ); Assert.assertEquals( new KafkaDataSourceMetadata( new SeekableStreamEndSequenceNumbers<>(topic, ImmutableMap.of(0, 10L, 1, 2L)) @@ -725,7 +781,10 @@ public class KafkaIndexTaskTest SegmentDescriptor desc5 = sd("2011/P1D", 1); SegmentDescriptor desc6 = sd("2012/P1D", 0); SegmentDescriptor desc7 = sd("2013/P1D", 0); - assertEqualsExceptVersion(ImmutableList.of(desc1, desc2, desc3, desc4, desc5, desc6, desc7), publishedDescriptors()); + assertEqualsExceptVersion( + ImmutableList.of(desc1, desc2, desc3, desc4, desc5, desc6, desc7), + publishedDescriptors() + ); Assert.assertEquals( new KafkaDataSourceMetadata( new SeekableStreamEndSequenceNumbers<>(topic, ImmutableMap.of(0, 10L, 1, 2L)) @@ -733,7 +792,10 @@ public class KafkaIndexTaskTest metadataStorageCoordinator.getDataSourceMetadata(DATA_SCHEMA.getDataSource()) ); - assertEqualsExceptVersion(ImmutableList.of(desc1, desc2, desc3, desc4, desc5, desc6, desc7), publishedDescriptors()); + assertEqualsExceptVersion( + ImmutableList.of(desc1, desc2, desc3, desc4, desc5, desc6, desc7), + publishedDescriptors() + ); Assert.assertEquals( new KafkaDataSourceMetadata( new SeekableStreamEndSequenceNumbers<>(topic, ImmutableMap.of(0, 10L, 1, 2L)) @@ -1957,7 +2019,10 @@ public class KafkaIndexTaskTest SegmentDescriptor desc5 = sd("2010/P1D", 0); SegmentDescriptor desc6 = sd("2011/P1D", 0); SegmentDescriptor desc7 = sd("2012/P1D", 0); - assertEqualsExceptVersion(ImmutableList.of(desc1, desc2, desc3, desc4, desc5, desc6, desc7), publishedDescriptors()); + assertEqualsExceptVersion( + ImmutableList.of(desc1, desc2, desc3, desc4, desc5, desc6, desc7), + publishedDescriptors() + ); Assert.assertEquals( new KafkaDataSourceMetadata(new SeekableStreamEndSequenceNumbers<>(topic, ImmutableMap.of(0, 10L))), metadataStorageCoordinator.getDataSourceMetadata(DATA_SCHEMA.getDataSource()) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java index eed2135906b..ae9593ee779 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java @@ -339,15 +339,8 @@ public class IndexTask extends AbstractBatchIndexTask implements ChatHandler return Response.ok(events).build(); } - @GET - @Path("/rowStats") - @Produces(MediaType.APPLICATION_JSON) - public Response getRowStats( - @Context final HttpServletRequest req, - @QueryParam("full") String full - ) + private Map doGetRowStats(String full) { - IndexTaskUtils.datasourceAuthorizationCheck(req, Action.READ, getDataSource(), authorizerMapper); Map returnMap = new HashMap<>(); Map totalsMap = new HashMap<>(); Map averagesMap = new HashMap<>(); @@ -396,6 +389,44 @@ public class IndexTask extends AbstractBatchIndexTask implements ChatHandler returnMap.put("totals", totalsMap); returnMap.put("movingAverages", averagesMap); + return returnMap; + } + + @GET + @Path("/rowStats") + @Produces(MediaType.APPLICATION_JSON) + public Response getRowStats( + @Context final HttpServletRequest req, + @QueryParam("full") String full + ) + { + IndexTaskUtils.datasourceAuthorizationCheck(req, Action.READ, getDataSource(), authorizerMapper); + return Response.ok(doGetRowStats(full)).build(); + } + + @GET + @Path("/liveReports") + @Produces(MediaType.APPLICATION_JSON) + public Response getLiveReports( + @Context final HttpServletRequest req, + @QueryParam("full") String full + ) + { + IndexTaskUtils.datasourceAuthorizationCheck(req, Action.READ, getDataSource(), authorizerMapper); + Map returnMap = new HashMap<>(); + Map ingestionStatsAndErrors = new HashMap<>(); + Map payload = new HashMap<>(); + Map events = getTaskCompletionUnparseableEvents(); + + payload.put("ingestionState", ingestionState); + payload.put("unparseableEvents", events); + payload.put("rowStats", doGetRowStats(full)); + + ingestionStatsAndErrors.put("taskId", getId()); + ingestionStatsAndErrors.put("payload", payload); + ingestionStatsAndErrors.put("type", "ingestionStatsAndErrors"); + + returnMap.put("ingestionStatsAndErrors", ingestionStatsAndErrors); return Response.ok(returnMap).build(); } @@ -462,7 +493,6 @@ public class IndexTask extends AbstractBatchIndexTask implements ChatHandler } else { dataSchema = ingestionSchema.getDataSchema(); } - ingestionState = IngestionState.BUILD_SEGMENTS; return generateAndPublishSegments( toolbox, @@ -957,7 +987,6 @@ public class IndexTask extends AbstractBatchIndexTask implements ChatHandler * * @param interval interval for shardSpec * @param row input row - * * @return a shardSpec */ ShardSpec getShardSpec(Interval interval, InputRow row) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/tasklogs/TaskRunnerTaskLogStreamer.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/tasklogs/TaskRunnerTaskLogStreamer.java index 564d1d8fa20..89b5d648cbe 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/tasklogs/TaskRunnerTaskLogStreamer.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/tasklogs/TaskRunnerTaskLogStreamer.java @@ -50,4 +50,15 @@ public class TaskRunnerTaskLogStreamer implements TaskLogStreamer return Optional.absent(); } } + + @Override + public Optional streamTaskReports(String taskId) throws IOException + { + final TaskRunner runner = taskMaster.getTaskRunner().orNull(); + if (runner instanceof TaskLogStreamer) { + return ((TaskLogStreamer) runner).streamTaskReports(taskId); + } else { + return Optional.absent(); + } + } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java index 6206b9d0282..dcbc64baa3e 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java @@ -638,6 +638,47 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer } } + @Override + public Optional streamTaskReports(final String taskId) + { + final ZkWorker zkWorker = findWorkerRunningTask(taskId); + + if (zkWorker == null) { + // Worker is not running this task, it might be available in deep storage + return Optional.absent(); + } else { + TaskLocation taskLocation = runningTasks.get(taskId).getLocation(); + final URL url = TaskRunnerUtils.makeTaskLocationURL( + taskLocation, + "/druid/worker/v1/chat/%s/liveReports", + taskId + ); + return Optional.of( + new ByteSource() + { + @Override + public InputStream openStream() throws IOException + { + try { + return httpClient.go( + new Request(HttpMethod.GET, url), + new InputStreamResponseHandler() + ).get(); + } + catch (InterruptedException e) { + throw new RuntimeException(e); + } + catch (ExecutionException e) { + // Unwrap if possible + Throwables.propagateIfPossible(e.getCause(), IOException.class); + throw new RuntimeException(e); + } + } + } + ); + } + } + /** * Adds a task to the pending queue */ @@ -753,7 +794,6 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer * needs to bootstrap after a restart. * * @param taskRunnerWorkItem - the task to assign - * * @return true iff the task is now assigned */ private boolean tryAssignTask(final Task task, final RemoteTaskRunnerWorkItem taskRunnerWorkItem) throws Exception @@ -787,8 +827,8 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer Maps.filterEntries( zkWorkers, input -> !lazyWorkers.containsKey(input.getKey()) && - !workersWithUnacknowledgedTask.containsKey(input.getKey()) && - !blackListedWorkers.contains(input.getValue()) + !workersWithUnacknowledgedTask.containsKey(input.getKey()) && + !blackListedWorkers.contains(input.getValue()) ), (String key, ZkWorker value) -> value.toImmutable() ) @@ -832,7 +872,6 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer * * @param theZkWorker The worker the task is assigned to * @param taskRunnerWorkItem The task to be assigned - * * @return boolean indicating whether the task was successfully assigned or not */ private boolean announceTask( @@ -911,7 +950,6 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer * The RemoteTaskRunner updates state according to these changes. * * @param worker contains metadata for a worker that has appeared in ZK - * * @return future that will contain a fully initialized worker */ private ListenableFuture addWorker(final Worker worker) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskRunnerUtils.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskRunnerUtils.java index e2ec90e1b0d..49c44c698fb 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskRunnerUtils.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskRunnerUtils.java @@ -113,4 +113,25 @@ public class TaskRunnerUtils throw new RuntimeException(e); } } + + public static URL makeTaskLocationURL(TaskLocation taskLocation, String pathFormat, String... pathParams) + { + Preconditions.checkArgument(pathFormat.startsWith("/"), "path must start with '/': %s", pathFormat); + final String path = StringUtils.format( + pathFormat, + Arrays.stream(pathParams).map(StringUtils::urlEncode).toArray() + ); + + try { + return new URI(StringUtils.format( + "http://%s:%s%s", + taskLocation.getHost(), + taskLocation.getPort(), + path + )).toURL(); + } + catch (URISyntaxException | MalformedURLException e) { + throw new RuntimeException(e); + } + } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java index 0657168f46c..b6222d269c0 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java @@ -331,8 +331,8 @@ public class HttpRemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer Maps.filterEntries( workers, input -> !lazyWorkers.containsKey(input.getKey()) && - !workersWithUnacknowledgedTask.containsKey(input.getKey()) && - !blackListedWorkers.containsKey(input.getKey()) + !workersWithUnacknowledgedTask.containsKey(input.getKey()) && + !blackListedWorkers.containsKey(input.getKey()) ), (String key, WorkerHolder value) -> value.toImmutable() ) @@ -902,6 +902,52 @@ public class HttpRemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer } } + @Override + public Optional streamTaskReports(String taskId) + { + HttpRemoteTaskRunnerWorkItem taskRunnerWorkItem = tasks.get(taskId); + Worker worker = null; + if (taskRunnerWorkItem != null && taskRunnerWorkItem.getState() != HttpRemoteTaskRunnerWorkItem.State.COMPLETE) { + worker = taskRunnerWorkItem.getWorker(); + } + + if (worker == null || !workers.containsKey(worker.getHost())) { + // Worker is not running this task, it might be available in deep storage + return Optional.absent(); + } else { + // Worker is still running this task + TaskLocation taskLocation = taskRunnerWorkItem.getLocation(); + final URL url = TaskRunnerUtils.makeTaskLocationURL( + taskLocation, + "/druid/worker/v1/chat/%s/liveReports", + taskId + ); + return Optional.of( + new ByteSource() + { + @Override + public InputStream openStream() throws IOException + { + try { + return httpClient.go( + new Request(HttpMethod.GET, url), + new InputStreamResponseHandler() + ).get(); + } + catch (InterruptedException e) { + throw new RuntimeException(e); + } + catch (ExecutionException e) { + // Unwrap if possible + Throwables.propagateIfPossible(e.getCause(), IOException.class); + throw new RuntimeException(e); + } + } + } + ); + } + } + @Override public List>> restore() { diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java index bc6d03ff6c5..3d256ffba41 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java @@ -713,8 +713,8 @@ public abstract class SeekableStreamIndexTaskRunner doGetRowStats() { - authorizationCheck(req, Action.READ); Map returnMap = new HashMap<>(); Map totalsMap = new HashMap<>(); Map averagesMap = new HashMap<>(); @@ -1511,9 +1505,51 @@ public abstract class SeekableStreamIndexTaskRunner doGetLiveReports() + { + Map returnMap = new HashMap<>(); + Map ingestionStatsAndErrors = new HashMap<>(); + Map payload = new HashMap<>(); + Map events = getTaskCompletionUnparseableEvents(); + + payload.put("ingestionState", ingestionState); + payload.put("unparseableEvents", events); + payload.put("rowStats", doGetRowStats()); + + ingestionStatsAndErrors.put("taskId", task.getId()); + ingestionStatsAndErrors.put("payload", payload); + ingestionStatsAndErrors.put("type", "ingestionStatsAndErrors"); + + returnMap.put("ingestionStatsAndErrors", ingestionStatsAndErrors); + return returnMap; + } + + @GET + @Path("/rowStats") + @Produces(MediaType.APPLICATION_JSON) + public Response getRowStats( + @Context final HttpServletRequest req + ) + { + authorizationCheck(req, Action.READ); + return Response.ok(doGetRowStats()).build(); + } + + @GET + @Path("/liveReports") + @Produces(MediaType.APPLICATION_JSON) + public Response getLiveReport( + @Context final HttpServletRequest req + ) + { + authorizationCheck(req, Action.READ); + return Response.ok(doGetLiveReports()).build(); + } + + @GET @Path("/unparseableEvents") @Produces(MediaType.APPLICATION_JSON) @@ -1586,7 +1622,8 @@ public abstract class SeekableStreamIndexTaskRunner entry : sequenceNumbers.entrySet()) { - if (createSequenceNumber(entry.getValue()).compareTo(createSequenceNumber(currOffsets.get(entry.getKey()))) < 0) { + if (createSequenceNumber(entry.getValue()).compareTo(createSequenceNumber(currOffsets.get(entry.getKey()))) + < 0) { return Response.status(Response.Status.BAD_REQUEST) .entity( StringUtils.format( @@ -1778,7 +1815,7 @@ public abstract class SeekableStreamIndexTaskRunner * 1) Verifies that the sequence numbers we read are at least as high as those read previously, and throws an * exception if not. * 2) Returns false if we should skip this record because it's either (a) the first record in a partition that we are @@ -1829,9 +1866,7 @@ public abstract class SeekableStreamIndexTaskRunner deserializePartitionsFromMetadata( @@ -1869,9 +1902,7 @@ public abstract class SeekableStreamIndexTaskRunner createDataSourceMetadata( @@ -1895,7 +1925,6 @@ public abstract class SeekableStreamIndexTaskRunner createSequenceNumber(SequenceOffsetType sequenceNumber);