Splitting KafkaIndexTask for better code maintenance (#5854)

* Refactoring KafkaIndexTask for better code maintenance

* fix bug

* fix test

* add annotation

* fix checkstyle

* remove SetEndOffsetsResult
This commit is contained in:
Jihoon Son 2018-06-22 13:00:03 -07:00 committed by GitHub
parent 1a7adabf57
commit 8c5ded0fad
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 3134 additions and 2204 deletions

View File

@ -0,0 +1,71 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.indexing.kafka;
import com.google.common.annotations.VisibleForTesting;
import io.druid.indexer.TaskStatus;
import io.druid.indexing.common.TaskToolbox;
import io.druid.indexing.common.stats.RowIngestionMeters;
import io.druid.indexing.kafka.KafkaIndexTask.Status;
import io.druid.segment.realtime.appenderator.Appenderator;
import io.druid.segment.realtime.firehose.ChatHandler;
import javax.ws.rs.core.Response;
import java.util.Map;
/**
* This class is used by only {@link KafkaIndexTask}. We currently have two implementations of this interface, i.e.,
* {@link IncrementalPublishingKafkaIndexTaskRunner} and {@link LegacyKafkaIndexTaskRunner}. The latter one was used in
* the versions prior to 0.12.0, but being kept to support rolling update from them.
*
* We don't have a good reason for having this interface except for better code maintenance for the latest kakfa
* indexing algorithm. As a result, this interface can be removed in the future when {@link LegacyKafkaIndexTaskRunner}
* is removed and it's no longer useful.
*/
public interface KafkaIndexTaskRunner extends ChatHandler
{
Appenderator getAppenderator();
TaskStatus run(TaskToolbox toolbox);
void stopGracefully();
// The below methods are mostly for unit testing.
@VisibleForTesting
RowIngestionMeters getRowIngestionMeters();
@VisibleForTesting
Status getStatus();
@VisibleForTesting
Map<Integer, Long> getCurrentOffsets();
@VisibleForTesting
Map<Integer, Long> getEndOffsets();
@VisibleForTesting
Response setEndOffsets(
Map<Integer, Long> offsets,
boolean resume,
boolean finish // this field is only for internal purposes, shouldn't be usually set by users
) throws InterruptedException;
@VisibleForTesting
Response pause(long timeout) throws InterruptedException;
@VisibleForTesting
void resume() throws InterruptedException;
}

View File

@ -36,28 +36,25 @@ import com.google.common.io.Files;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import io.druid.data.input.impl.FloatDimensionSchema;
import io.druid.data.input.impl.LongDimensionSchema;
import io.druid.data.input.impl.StringDimensionSchema;
import io.druid.indexing.common.IngestionStatsAndErrorsTaskReportData;
import io.druid.indexing.common.TaskReport;
import io.druid.indexing.common.TaskReportFileWriter;
import io.druid.indexing.common.stats.RowIngestionMeters;
import io.druid.indexing.common.stats.RowIngestionMetersFactory;
import io.druid.indexing.common.task.IndexTaskTest;
import io.druid.client.cache.CacheConfig;
import io.druid.client.cache.MapCache;
import io.druid.data.input.impl.DimensionsSpec;
import io.druid.data.input.impl.FloatDimensionSchema;
import io.druid.data.input.impl.JSONParseSpec;
import io.druid.data.input.impl.LongDimensionSchema;
import io.druid.data.input.impl.StringDimensionSchema;
import io.druid.data.input.impl.StringInputRowParser;
import io.druid.data.input.impl.TimestampSpec;
import io.druid.discovery.DataNodeService;
import io.druid.discovery.DruidNodeAnnouncer;
import io.druid.discovery.LookupNodeService;
import io.druid.indexer.TaskState;
import io.druid.indexer.TaskStatus;
import io.druid.indexing.common.IngestionStatsAndErrorsTaskReportData;
import io.druid.indexing.common.SegmentLoaderFactory;
import io.druid.indexing.common.TaskLock;
import io.druid.indexer.TaskStatus;
import io.druid.indexing.common.TaskReport;
import io.druid.indexing.common.TaskReportFileWriter;
import io.druid.indexing.common.TaskToolbox;
import io.druid.indexing.common.TaskToolboxFactory;
import io.druid.indexing.common.TestUtils;
@ -66,6 +63,9 @@ import io.druid.indexing.common.actions.TaskActionClientFactory;
import io.druid.indexing.common.actions.TaskActionToolbox;
import io.druid.indexing.common.config.TaskConfig;
import io.druid.indexing.common.config.TaskStorageConfig;
import io.druid.indexing.common.stats.RowIngestionMeters;
import io.druid.indexing.common.stats.RowIngestionMetersFactory;
import io.druid.indexing.common.task.IndexTaskTest;
import io.druid.indexing.common.task.Task;
import io.druid.indexing.kafka.supervisor.KafkaSupervisor;
import io.druid.indexing.kafka.test.TestBroker;
@ -395,9 +395,9 @@ public class KafkaIndexTaskTest
Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode());
// Check metrics
Assert.assertEquals(3, task.getRowIngestionMeters().getProcessed());
Assert.assertEquals(0, task.getRowIngestionMeters().getUnparseable());
Assert.assertEquals(0, task.getRowIngestionMeters().getThrownAway());
Assert.assertEquals(3, task.getRunner().getRowIngestionMeters().getProcessed());
Assert.assertEquals(0, task.getRunner().getRowIngestionMeters().getUnparseable());
Assert.assertEquals(0, task.getRunner().getRowIngestionMeters().getThrownAway());
// Check published metadata
SegmentDescriptor desc1 = SD(task, "2010/P1D", 0);
@ -434,7 +434,7 @@ public class KafkaIndexTaskTest
final ListenableFuture<TaskStatus> future = runTask(task);
// Wait for the task to start reading
while (task.getStatus() != KafkaIndexTask.Status.READING) {
while (task.getRunner().getStatus() != KafkaIndexTask.Status.READING) {
Thread.sleep(10);
}
@ -449,9 +449,9 @@ public class KafkaIndexTaskTest
Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode());
// Check metrics
Assert.assertEquals(3, task.getRowIngestionMeters().getProcessed());
Assert.assertEquals(0, task.getRowIngestionMeters().getUnparseable());
Assert.assertEquals(0, task.getRowIngestionMeters().getThrownAway());
Assert.assertEquals(3, task.getRunner().getRowIngestionMeters().getProcessed());
Assert.assertEquals(0, task.getRunner().getRowIngestionMeters().getUnparseable());
Assert.assertEquals(0, task.getRunner().getRowIngestionMeters().getThrownAway());
// Check published metadata
SegmentDescriptor desc1 = SD(task, "2010/P1D", 0);
@ -507,13 +507,13 @@ public class KafkaIndexTaskTest
)
);
final ListenableFuture<TaskStatus> future = runTask(task);
while (task.getStatus() != KafkaIndexTask.Status.PAUSED) {
while (task.getRunner().getStatus() != KafkaIndexTask.Status.PAUSED) {
Thread.sleep(10);
}
final Map<Integer, Long> currentOffsets = ImmutableMap.copyOf(task.getCurrentOffsets());
final Map<Integer, Long> currentOffsets = ImmutableMap.copyOf(task.getRunner().getCurrentOffsets());
Assert.assertTrue(checkpoint1.getPartitionOffsetMap().equals(currentOffsets) || checkpoint2.getPartitionOffsetMap()
.equals(currentOffsets));
task.setEndOffsets(currentOffsets, true, false);
task.getRunner().setEndOffsets(currentOffsets, true, false);
Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode());
Assert.assertEquals(1, checkpointRequestsHash.size());
@ -527,9 +527,9 @@ public class KafkaIndexTaskTest
));
// Check metrics
Assert.assertEquals(8, task.getRowIngestionMeters().getProcessed());
Assert.assertEquals(3, task.getRowIngestionMeters().getUnparseable());
Assert.assertEquals(1, task.getRowIngestionMeters().getThrownAway());
Assert.assertEquals(8, task.getRunner().getRowIngestionMeters().getProcessed());
Assert.assertEquals(3, task.getRunner().getRowIngestionMeters().getUnparseable());
Assert.assertEquals(1, task.getRunner().getRowIngestionMeters().getThrownAway());
// Check published metadata
SegmentDescriptor desc1 = SD(task, "2008/P1D", 0);
@ -598,12 +598,12 @@ public class KafkaIndexTaskTest
final ListenableFuture<TaskStatus> future = runTask(task);
// task will pause for checkpointing
while (task.getStatus() != KafkaIndexTask.Status.PAUSED) {
while (task.getRunner().getStatus() != KafkaIndexTask.Status.PAUSED) {
Thread.sleep(10);
}
final Map<Integer, Long> currentOffsets = ImmutableMap.copyOf(task.getCurrentOffsets());
final Map<Integer, Long> currentOffsets = ImmutableMap.copyOf(task.getRunner().getCurrentOffsets());
Assert.assertTrue(checkpoint.getPartitionOffsetMap().equals(currentOffsets));
task.setEndOffsets(currentOffsets, true, false);
task.getRunner().setEndOffsets(currentOffsets, true, false);
Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode());
Assert.assertEquals(1, checkpointRequestsHash.size());
@ -617,9 +617,9 @@ public class KafkaIndexTaskTest
));
// Check metrics
Assert.assertEquals(2, task.getRowIngestionMeters().getProcessed());
Assert.assertEquals(0, task.getRowIngestionMeters().getUnparseable());
Assert.assertEquals(0, task.getRowIngestionMeters().getThrownAway());
Assert.assertEquals(2, task.getRunner().getRowIngestionMeters().getProcessed());
Assert.assertEquals(0, task.getRunner().getRowIngestionMeters().getUnparseable());
Assert.assertEquals(0, task.getRunner().getRowIngestionMeters().getThrownAway());
// Check published metadata
SegmentDescriptor desc1 = SD(task, "2008/P1D", 0);
@ -656,7 +656,7 @@ public class KafkaIndexTaskTest
final ListenableFuture<TaskStatus> future = runTask(task);
// Wait for the task to start reading
while (task.getStatus() != KafkaIndexTask.Status.READING) {
while (task.getRunner().getStatus() != KafkaIndexTask.Status.READING) {
Thread.sleep(10);
}
@ -671,9 +671,9 @@ public class KafkaIndexTaskTest
Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode());
// Check metrics
Assert.assertEquals(3, task.getRowIngestionMeters().getProcessed());
Assert.assertEquals(0, task.getRowIngestionMeters().getUnparseable());
Assert.assertEquals(2, task.getRowIngestionMeters().getThrownAway());
Assert.assertEquals(3, task.getRunner().getRowIngestionMeters().getProcessed());
Assert.assertEquals(0, task.getRunner().getRowIngestionMeters().getUnparseable());
Assert.assertEquals(2, task.getRunner().getRowIngestionMeters().getThrownAway());
// Check published metadata
SegmentDescriptor desc1 = SD(task, "2010/P1D", 0);
@ -710,7 +710,7 @@ public class KafkaIndexTaskTest
final ListenableFuture<TaskStatus> future = runTask(task);
// Wait for the task to start reading
while (task.getStatus() != KafkaIndexTask.Status.READING) {
while (task.getRunner().getStatus() != KafkaIndexTask.Status.READING) {
Thread.sleep(10);
}
@ -725,9 +725,9 @@ public class KafkaIndexTaskTest
Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode());
// Check metrics
Assert.assertEquals(3, task.getRowIngestionMeters().getProcessed());
Assert.assertEquals(0, task.getRowIngestionMeters().getUnparseable());
Assert.assertEquals(2, task.getRowIngestionMeters().getThrownAway());
Assert.assertEquals(3, task.getRunner().getRowIngestionMeters().getProcessed());
Assert.assertEquals(0, task.getRunner().getRowIngestionMeters().getUnparseable());
Assert.assertEquals(2, task.getRunner().getRowIngestionMeters().getThrownAway());
// Check published metadata
SegmentDescriptor desc1 = SD(task, "2008/P1D", 0);
@ -774,7 +774,7 @@ public class KafkaIndexTaskTest
final ListenableFuture<TaskStatus> future = runTask(task);
// Wait for the task to start reading
while (task.getStatus() != KafkaIndexTask.Status.READING) {
while (task.getRunner().getStatus() != KafkaIndexTask.Status.READING) {
Thread.sleep(10);
}
@ -789,9 +789,9 @@ public class KafkaIndexTaskTest
Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode());
// Check metrics
Assert.assertEquals(1, task.getRowIngestionMeters().getProcessed());
Assert.assertEquals(0, task.getRowIngestionMeters().getUnparseable());
Assert.assertEquals(4, task.getRowIngestionMeters().getThrownAway());
Assert.assertEquals(1, task.getRunner().getRowIngestionMeters().getProcessed());
Assert.assertEquals(0, task.getRunner().getRowIngestionMeters().getUnparseable());
Assert.assertEquals(4, task.getRunner().getRowIngestionMeters().getThrownAway());
// Check published metadata
SegmentDescriptor desc1 = SD(task, "2009/P1D", 0);
@ -837,9 +837,9 @@ public class KafkaIndexTaskTest
Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode());
// Check metrics
Assert.assertEquals(0, task.getRowIngestionMeters().getProcessed());
Assert.assertEquals(0, task.getRowIngestionMeters().getUnparseable());
Assert.assertEquals(0, task.getRowIngestionMeters().getThrownAway());
Assert.assertEquals(0, task.getRunner().getRowIngestionMeters().getProcessed());
Assert.assertEquals(0, task.getRunner().getRowIngestionMeters().getUnparseable());
Assert.assertEquals(0, task.getRunner().getRowIngestionMeters().getThrownAway());
// Check published metadata
Assert.assertEquals(ImmutableSet.of(), publishedDescriptors());
@ -878,9 +878,9 @@ public class KafkaIndexTaskTest
Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode());
// Check metrics
Assert.assertEquals(3, task.getRowIngestionMeters().getProcessed());
Assert.assertEquals(0, task.getRowIngestionMeters().getUnparseable());
Assert.assertEquals(0, task.getRowIngestionMeters().getThrownAway());
Assert.assertEquals(3, task.getRunner().getRowIngestionMeters().getProcessed());
Assert.assertEquals(0, task.getRunner().getRowIngestionMeters().getUnparseable());
Assert.assertEquals(0, task.getRunner().getRowIngestionMeters().getThrownAway());
// Check published metadata
SegmentDescriptor desc1 = SD(task, "2010/P1D", 0);
@ -933,9 +933,9 @@ public class KafkaIndexTaskTest
);
// Check metrics
Assert.assertEquals(3, task.getRowIngestionMeters().getProcessed());
Assert.assertEquals(0, task.getRowIngestionMeters().getUnparseable());
Assert.assertEquals(0, task.getRowIngestionMeters().getThrownAway());
Assert.assertEquals(3, task.getRunner().getRowIngestionMeters().getProcessed());
Assert.assertEquals(0, task.getRunner().getRowIngestionMeters().getUnparseable());
Assert.assertEquals(0, task.getRunner().getRowIngestionMeters().getThrownAway());
// Check published metadata
SegmentDescriptor desc1 = SD(task, "2010/P1D", 0);
@ -988,9 +988,9 @@ public class KafkaIndexTaskTest
Assert.assertEquals(TaskState.FAILED, future.get().getStatusCode());
// Check metrics
Assert.assertEquals(3, task.getRowIngestionMeters().getProcessed());
Assert.assertEquals(1, task.getRowIngestionMeters().getUnparseable());
Assert.assertEquals(0, task.getRowIngestionMeters().getThrownAway());
Assert.assertEquals(3, task.getRunner().getRowIngestionMeters().getProcessed());
Assert.assertEquals(1, task.getRunner().getRowIngestionMeters().getUnparseable());
Assert.assertEquals(0, task.getRunner().getRowIngestionMeters().getThrownAway());
// Check published metadata
Assert.assertEquals(ImmutableSet.of(), publishedDescriptors());
@ -1035,10 +1035,10 @@ public class KafkaIndexTaskTest
Assert.assertEquals(null, status.getErrorMsg());
// Check metrics
Assert.assertEquals(4, task.getRowIngestionMeters().getProcessed());
Assert.assertEquals(3, task.getRowIngestionMeters().getProcessedWithError());
Assert.assertEquals(3, task.getRowIngestionMeters().getUnparseable());
Assert.assertEquals(1, task.getRowIngestionMeters().getThrownAway());
Assert.assertEquals(4, task.getRunner().getRowIngestionMeters().getProcessed());
Assert.assertEquals(3, task.getRunner().getRowIngestionMeters().getProcessedWithError());
Assert.assertEquals(3, task.getRunner().getRowIngestionMeters().getUnparseable());
Assert.assertEquals(1, task.getRunner().getRowIngestionMeters().getThrownAway());
// Check published metadata
SegmentDescriptor desc1 = SD(task, "2010/P1D", 0);
@ -1117,10 +1117,10 @@ public class KafkaIndexTaskTest
IndexTaskTest.checkTaskStatusErrorMsgForParseExceptionsExceeded(status);
// Check metrics
Assert.assertEquals(3, task.getRowIngestionMeters().getProcessed());
Assert.assertEquals(0, task.getRowIngestionMeters().getProcessedWithError());
Assert.assertEquals(3, task.getRowIngestionMeters().getUnparseable());
Assert.assertEquals(0, task.getRowIngestionMeters().getThrownAway());
Assert.assertEquals(3, task.getRunner().getRowIngestionMeters().getProcessed());
Assert.assertEquals(0, task.getRunner().getRowIngestionMeters().getProcessedWithError());
Assert.assertEquals(3, task.getRunner().getRowIngestionMeters().getUnparseable());
Assert.assertEquals(0, task.getRunner().getRowIngestionMeters().getThrownAway());
// Check published metadata
Assert.assertEquals(ImmutableSet.of(), publishedDescriptors());
@ -1197,12 +1197,12 @@ public class KafkaIndexTaskTest
Assert.assertEquals(TaskState.SUCCESS, future2.get().getStatusCode());
// Check metrics
Assert.assertEquals(3, task1.getRowIngestionMeters().getProcessed());
Assert.assertEquals(0, task1.getRowIngestionMeters().getUnparseable());
Assert.assertEquals(0, task1.getRowIngestionMeters().getThrownAway());
Assert.assertEquals(3, task2.getRowIngestionMeters().getProcessed());
Assert.assertEquals(0, task2.getRowIngestionMeters().getUnparseable());
Assert.assertEquals(0, task2.getRowIngestionMeters().getThrownAway());
Assert.assertEquals(3, task1.getRunner().getRowIngestionMeters().getProcessed());
Assert.assertEquals(0, task1.getRunner().getRowIngestionMeters().getUnparseable());
Assert.assertEquals(0, task1.getRunner().getRowIngestionMeters().getThrownAway());
Assert.assertEquals(3, task2.getRunner().getRowIngestionMeters().getProcessed());
Assert.assertEquals(0, task2.getRunner().getRowIngestionMeters().getUnparseable());
Assert.assertEquals(0, task2.getRunner().getRowIngestionMeters().getThrownAway());
// Check published segments & metadata
SegmentDescriptor desc1 = SD(task1, "2010/P1D", 0);
@ -1266,12 +1266,12 @@ public class KafkaIndexTaskTest
Assert.assertEquals(TaskState.FAILED, future2.get().getStatusCode());
// Check metrics
Assert.assertEquals(3, task1.getRowIngestionMeters().getProcessed());
Assert.assertEquals(0, task1.getRowIngestionMeters().getUnparseable());
Assert.assertEquals(0, task1.getRowIngestionMeters().getThrownAway());
Assert.assertEquals(3, task2.getRowIngestionMeters().getProcessed());
Assert.assertEquals(3, task2.getRowIngestionMeters().getUnparseable());
Assert.assertEquals(1, task2.getRowIngestionMeters().getThrownAway());
Assert.assertEquals(3, task1.getRunner().getRowIngestionMeters().getProcessed());
Assert.assertEquals(0, task1.getRunner().getRowIngestionMeters().getUnparseable());
Assert.assertEquals(0, task1.getRunner().getRowIngestionMeters().getThrownAway());
Assert.assertEquals(3, task2.getRunner().getRowIngestionMeters().getProcessed());
Assert.assertEquals(3, task2.getRunner().getRowIngestionMeters().getUnparseable());
Assert.assertEquals(1, task2.getRunner().getRowIngestionMeters().getThrownAway());
// Check published segments & metadata, should all be from the first task
SegmentDescriptor desc1 = SD(task1, "2010/P1D", 0);
@ -1341,12 +1341,12 @@ public class KafkaIndexTaskTest
Assert.assertEquals(TaskState.SUCCESS, future2.get().getStatusCode());
// Check metrics
Assert.assertEquals(3, task1.getRowIngestionMeters().getProcessed());
Assert.assertEquals(0, task1.getRowIngestionMeters().getUnparseable());
Assert.assertEquals(0, task1.getRowIngestionMeters().getThrownAway());
Assert.assertEquals(3, task2.getRowIngestionMeters().getProcessed());
Assert.assertEquals(3, task2.getRowIngestionMeters().getUnparseable());
Assert.assertEquals(1, task2.getRowIngestionMeters().getThrownAway());
Assert.assertEquals(3, task1.getRunner().getRowIngestionMeters().getProcessed());
Assert.assertEquals(0, task1.getRunner().getRowIngestionMeters().getUnparseable());
Assert.assertEquals(0, task1.getRunner().getRowIngestionMeters().getThrownAway());
Assert.assertEquals(3, task2.getRunner().getRowIngestionMeters().getProcessed());
Assert.assertEquals(3, task2.getRunner().getRowIngestionMeters().getUnparseable());
Assert.assertEquals(1, task2.getRunner().getRowIngestionMeters().getThrownAway());
// Check published segments & metadata
SegmentDescriptor desc3 = SD(task2, "2011/P1D", 1);
@ -1393,9 +1393,9 @@ public class KafkaIndexTaskTest
Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode());
// Check metrics
Assert.assertEquals(5, task.getRowIngestionMeters().getProcessed());
Assert.assertEquals(0, task.getRowIngestionMeters().getUnparseable());
Assert.assertEquals(0, task.getRowIngestionMeters().getThrownAway());
Assert.assertEquals(5, task.getRunner().getRowIngestionMeters().getProcessed());
Assert.assertEquals(0, task.getRunner().getRowIngestionMeters().getUnparseable());
Assert.assertEquals(0, task.getRunner().getRowIngestionMeters().getThrownAway());
// Check published segments & metadata
SegmentDescriptor desc1 = SD(task, "2010/P1D", 0);
@ -1473,12 +1473,12 @@ public class KafkaIndexTaskTest
Assert.assertEquals(TaskState.SUCCESS, future2.get().getStatusCode());
// Check metrics
Assert.assertEquals(3, task1.getRowIngestionMeters().getProcessed());
Assert.assertEquals(0, task1.getRowIngestionMeters().getUnparseable());
Assert.assertEquals(0, task1.getRowIngestionMeters().getThrownAway());
Assert.assertEquals(1, task2.getRowIngestionMeters().getProcessed());
Assert.assertEquals(0, task2.getRowIngestionMeters().getUnparseable());
Assert.assertEquals(0, task2.getRowIngestionMeters().getThrownAway());
Assert.assertEquals(3, task1.getRunner().getRowIngestionMeters().getProcessed());
Assert.assertEquals(0, task1.getRunner().getRowIngestionMeters().getUnparseable());
Assert.assertEquals(0, task1.getRunner().getRowIngestionMeters().getThrownAway());
Assert.assertEquals(1, task2.getRunner().getRowIngestionMeters().getProcessed());
Assert.assertEquals(0, task2.getRunner().getRowIngestionMeters().getUnparseable());
Assert.assertEquals(0, task2.getRunner().getRowIngestionMeters().getThrownAway());
// Check published segments & metadata
SegmentDescriptor desc1 = SD(task1, "2010/P1D", 0);
@ -1564,12 +1564,12 @@ public class KafkaIndexTaskTest
Assert.assertEquals(TaskState.SUCCESS, future2.get().getStatusCode());
// Check metrics
Assert.assertEquals(2, task1.getRowIngestionMeters().getProcessed());
Assert.assertEquals(0, task1.getRowIngestionMeters().getUnparseable());
Assert.assertEquals(0, task1.getRowIngestionMeters().getThrownAway());
Assert.assertEquals(1, task2.getRowIngestionMeters().getProcessed());
Assert.assertEquals(0, task2.getRowIngestionMeters().getUnparseable());
Assert.assertEquals(0, task2.getRowIngestionMeters().getThrownAway());
Assert.assertEquals(2, task1.getRunner().getRowIngestionMeters().getProcessed());
Assert.assertEquals(0, task1.getRunner().getRowIngestionMeters().getUnparseable());
Assert.assertEquals(0, task1.getRunner().getRowIngestionMeters().getThrownAway());
Assert.assertEquals(1, task2.getRunner().getRowIngestionMeters().getProcessed());
Assert.assertEquals(0, task2.getRunner().getRowIngestionMeters().getUnparseable());
Assert.assertEquals(0, task2.getRunner().getRowIngestionMeters().getThrownAway());
// Check published segments & metadata
SegmentDescriptor desc1 = SD(task1, "2010/P1D", 0);
@ -1618,15 +1618,15 @@ public class KafkaIndexTaskTest
}
Assert.assertEquals(2, countEvents(task));
Assert.assertEquals(KafkaIndexTask.Status.READING, task.getStatus());
Assert.assertEquals(KafkaIndexTask.Status.READING, task.getRunner().getStatus());
Map<Integer, Long> currentOffsets = objectMapper.readValue(
task.pause(0).getEntity().toString(),
task.getRunner().pause(0).getEntity().toString(),
new TypeReference<Map<Integer, Long>>()
{
}
);
Assert.assertEquals(KafkaIndexTask.Status.PAUSED, task.getStatus());
Assert.assertEquals(KafkaIndexTask.Status.PAUSED, task.getRunner().getStatus());
// Insert remaining data
try (final KafkaProducer<byte[], byte[]> kafkaProducer = kafkaServer.newProducer()) {
@ -1643,17 +1643,17 @@ public class KafkaIndexTaskTest
// carry on..
}
Assert.assertEquals(currentOffsets, task.getCurrentOffsets());
Assert.assertEquals(currentOffsets, task.getRunner().getCurrentOffsets());
task.resume();
task.getRunner().resume();
Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode());
Assert.assertEquals(task.getEndOffsets(), task.getCurrentOffsets());
Assert.assertEquals(task.getRunner().getEndOffsets(), task.getRunner().getCurrentOffsets());
// Check metrics
Assert.assertEquals(3, task.getRowIngestionMeters().getProcessed());
Assert.assertEquals(0, task.getRowIngestionMeters().getUnparseable());
Assert.assertEquals(0, task.getRowIngestionMeters().getThrownAway());
Assert.assertEquals(3, task.getRunner().getRowIngestionMeters().getProcessed());
Assert.assertEquals(0, task.getRunner().getRowIngestionMeters().getUnparseable());
Assert.assertEquals(0, task.getRunner().getRowIngestionMeters().getThrownAway());
// Check published metadata
SegmentDescriptor desc1 = SD(task, "2010/P1D", 0);
@ -1695,50 +1695,50 @@ public class KafkaIndexTaskTest
}
}
while (task.getStatus() != KafkaIndexTask.Status.PAUSED) {
while (task.getRunner().getStatus() != KafkaIndexTask.Status.PAUSED) {
Thread.sleep(25);
}
// reached the end of the assigned offsets and paused instead of publishing
Assert.assertEquals(task.getEndOffsets(), task.getCurrentOffsets());
Assert.assertEquals(KafkaIndexTask.Status.PAUSED, task.getStatus());
Assert.assertEquals(task.getRunner().getEndOffsets(), task.getRunner().getCurrentOffsets());
Assert.assertEquals(KafkaIndexTask.Status.PAUSED, task.getRunner().getStatus());
Assert.assertEquals(ImmutableMap.of(0, 3L), task.getEndOffsets());
Assert.assertEquals(ImmutableMap.of(0, 3L), task.getRunner().getEndOffsets());
Map<Integer, Long> newEndOffsets = ImmutableMap.of(0, 4L);
task.setEndOffsets(newEndOffsets, false, true);
Assert.assertEquals(newEndOffsets, task.getEndOffsets());
Assert.assertEquals(KafkaIndexTask.Status.PAUSED, task.getStatus());
task.resume();
task.getRunner().setEndOffsets(newEndOffsets, false, true);
Assert.assertEquals(newEndOffsets, task.getRunner().getEndOffsets());
Assert.assertEquals(KafkaIndexTask.Status.PAUSED, task.getRunner().getStatus());
task.getRunner().resume();
while (task.getStatus() != KafkaIndexTask.Status.PAUSED) {
while (task.getRunner().getStatus() != KafkaIndexTask.Status.PAUSED) {
Thread.sleep(25);
}
// reached the end of the updated offsets and paused
Assert.assertEquals(newEndOffsets, task.getCurrentOffsets());
Assert.assertEquals(KafkaIndexTask.Status.PAUSED, task.getStatus());
Assert.assertEquals(newEndOffsets, task.getRunner().getCurrentOffsets());
Assert.assertEquals(KafkaIndexTask.Status.PAUSED, task.getRunner().getStatus());
// try again but with resume flag == true
newEndOffsets = ImmutableMap.of(0, 7L);
task.setEndOffsets(newEndOffsets, true, true);
Assert.assertEquals(newEndOffsets, task.getEndOffsets());
Assert.assertNotEquals(KafkaIndexTask.Status.PAUSED, task.getStatus());
task.getRunner().setEndOffsets(newEndOffsets, true, true);
Assert.assertEquals(newEndOffsets, task.getRunner().getEndOffsets());
Assert.assertNotEquals(KafkaIndexTask.Status.PAUSED, task.getRunner().getStatus());
while (task.getStatus() != KafkaIndexTask.Status.PAUSED) {
while (task.getRunner().getStatus() != KafkaIndexTask.Status.PAUSED) {
Thread.sleep(25);
}
Assert.assertEquals(newEndOffsets, task.getCurrentOffsets());
Assert.assertEquals(KafkaIndexTask.Status.PAUSED, task.getStatus());
Assert.assertEquals(newEndOffsets, task.getRunner().getCurrentOffsets());
Assert.assertEquals(KafkaIndexTask.Status.PAUSED, task.getRunner().getStatus());
task.resume();
task.getRunner().resume();
Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode());
// Check metrics
Assert.assertEquals(4, task.getRowIngestionMeters().getProcessed());
Assert.assertEquals(2, task.getRowIngestionMeters().getUnparseable());
Assert.assertEquals(0, task.getRowIngestionMeters().getThrownAway());
Assert.assertEquals(4, task.getRunner().getRowIngestionMeters().getProcessed());
Assert.assertEquals(2, task.getRunner().getRowIngestionMeters().getUnparseable());
Assert.assertEquals(0, task.getRunner().getRowIngestionMeters().getThrownAway());
// Check published metadata
SegmentDescriptor desc1 = SD(task, "2009/P1D", 0);
@ -1776,13 +1776,13 @@ public class KafkaIndexTaskTest
runTask(task);
while (!task.getStatus().equals(KafkaIndexTask.Status.READING)) {
while (!task.getRunner().getStatus().equals(KafkaIndexTask.Status.READING)) {
Thread.sleep(2000);
}
task.pause(0);
task.getRunner().pause(0);
while (!task.getStatus().equals(KafkaIndexTask.Status.PAUSED)) {
while (!task.getRunner().getStatus().equals(KafkaIndexTask.Status.PAUSED)) {
Thread.sleep(25);
}
}
@ -1815,14 +1815,14 @@ public class KafkaIndexTaskTest
runTask(task);
while (!task.getStatus().equals(KafkaIndexTask.Status.READING)) {
while (!task.getRunner().getStatus().equals(KafkaIndexTask.Status.READING)) {
Thread.sleep(20);
}
for (int i = 0; i < 5; i++) {
Assert.assertEquals(task.getStatus(), KafkaIndexTask.Status.READING);
Assert.assertEquals(task.getRunner().getStatus(), KafkaIndexTask.Status.READING);
// Offset should not be reset
Assert.assertTrue(task.getCurrentOffsets().get(0) == 200L);
Assert.assertTrue(task.getRunner().getCurrentOffsets().get(0) == 200L);
}
}
@ -1874,9 +1874,9 @@ public class KafkaIndexTaskTest
Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode());
// Check metrics
Assert.assertEquals(3, task.getRowIngestionMeters().getProcessed());
Assert.assertEquals(0, task.getRowIngestionMeters().getUnparseable());
Assert.assertEquals(0, task.getRowIngestionMeters().getThrownAway());
Assert.assertEquals(3, task.getRunner().getRowIngestionMeters().getProcessed());
Assert.assertEquals(0, task.getRunner().getRowIngestionMeters().getUnparseable());
Assert.assertEquals(0, task.getRunner().getRowIngestionMeters().getThrownAway());
// Check published metadata
SegmentDescriptor desc1 = SD(task, "2010/P1D", 0);

View File

@ -22,6 +22,7 @@ package io.druid.indexing.common;
import com.fasterxml.jackson.annotation.JsonProperty;
import io.druid.indexer.IngestionState;
import javax.annotation.Nullable;
import java.util.Map;
import java.util.Objects;
@ -37,13 +38,14 @@ public class IngestionStatsAndErrorsTaskReportData
private Map<String, Object> rowStats;
@JsonProperty
@Nullable
private String errorMsg;
public IngestionStatsAndErrorsTaskReportData(
@JsonProperty("ingestionState") IngestionState ingestionState,
@JsonProperty("unparseableEvents") Map<String, Object> unparseableEvents,
@JsonProperty("rowStats") Map<String, Object> rowStats,
@JsonProperty("errorMsg") String errorMsg
@JsonProperty("errorMsg") @Nullable String errorMsg
)
{
this.ingestionState = ingestionState;
@ -71,6 +73,7 @@ public class IngestionStatsAndErrorsTaskReportData
}
@JsonProperty
@Nullable
public String getErrorMsg()
{
return errorMsg;