Fix testIncrementalHandOffReadsThroughEndOffsets in Kafka/KinesisIndexTaskTest (#7264)

* Fix testIncrementalHandOffReadsThroughEndOffsets in Kafka/KinesisIndexTaskTest

* revert unnecessary change

* fix test

* remove debug log
This commit is contained in:
Jihoon Son 2019-03-14 20:31:08 -07:00 committed by Gian Merlino
parent 7ada1c49f9
commit 69a6f1154a
2 changed files with 188 additions and 169 deletions

View File

@ -85,6 +85,7 @@ import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.concurrent.ListenableFutures;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.common.parsers.JSONPathSpec;
@ -909,11 +910,11 @@ public class KafkaIndexTaskTest
final SeekableStreamPartitions<Integer, Long> checkpoint1 =
new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 5L));
final SeekableStreamPartitions<Integer, Long> checkpoint2 =
new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 12L));
new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 9L));
final SeekableStreamPartitions<Integer, Long> endPartitions =
new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, Long.MAX_VALUE));
final KafkaIndexTask task = createTask(
final KafkaIndexTask normalReplica = createTask(
null,
new KafkaIndexTaskIOConfig(
0,
@ -927,34 +928,68 @@ public class KafkaIndexTaskTest
null
)
);
final ListenableFuture<TaskStatus> future = runTask(task);
while (task.getRunner().getStatus() != Status.PAUSED) {
Thread.sleep(10);
}
Map<Integer, Long> currentOffsets = ImmutableMap.copyOf(task.getRunner().getCurrentOffsets());
Assert.assertEquals(checkpoint1.getPartitionSequenceNumberMap(), currentOffsets);
// Simulating the case when another replica has consumed up to the offset of 8
task.getRunner().setEndOffsets(ImmutableMap.of(0, 8L), false);
// The task is supposed to consume remaining rows up to the offset of 13
while (task.getRunner().getStatus() != Status.PAUSED) {
Thread.sleep(10);
}
currentOffsets = ImmutableMap.copyOf(task.getRunner().getCurrentOffsets());
Assert.assertEquals(checkpoint2.getPartitionSequenceNumberMap(), currentOffsets);
task.getRunner().setEndOffsets(
ImmutableMap.of(0, task.getRunner().getCurrentOffsets().get(0) + 1L),
true
final KafkaIndexTask staleReplica = createTask(
null,
new KafkaIndexTaskIOConfig(
0,
baseSequenceName,
startPartitions,
endPartitions,
consumerProps,
KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
true,
null,
null
)
);
Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode());
final ListenableFuture<TaskStatus> normalReplicaFuture = runTask(normalReplica);
// Simulating one replica is slower than the other
final ListenableFuture<TaskStatus> staleReplicaFuture = ListenableFutures.transformAsync(
taskExec.submit(() -> {
Thread.sleep(1000);
return staleReplica;
}),
this::runTask
);
// processed count would be 8 if it stopped at it's current offsets
Assert.assertEquals(13, task.getRunner().getRowIngestionMeters().getProcessed());
Assert.assertEquals(0, task.getRunner().getRowIngestionMeters().getUnparseable());
Assert.assertEquals(0, task.getRunner().getRowIngestionMeters().getThrownAway());
while (normalReplica.getRunner().getStatus() != Status.PAUSED) {
Thread.sleep(10);
}
staleReplica.getRunner().pause();
while (staleReplica.getRunner().getStatus() != Status.PAUSED) {
Thread.sleep(10);
}
Map<Integer, Long> currentOffsets = ImmutableMap.copyOf(normalReplica.getRunner().getCurrentOffsets());
Assert.assertEquals(checkpoint1.getPartitionSequenceNumberMap(), currentOffsets);
normalReplica.getRunner().setEndOffsets(currentOffsets, false);
staleReplica.getRunner().setEndOffsets(currentOffsets, false);
while (normalReplica.getRunner().getStatus() != Status.PAUSED) {
Thread.sleep(10);
}
while (staleReplica.getRunner().getStatus() != Status.PAUSED) {
Thread.sleep(10);
}
currentOffsets = ImmutableMap.copyOf(normalReplica.getRunner().getCurrentOffsets());
Assert.assertEquals(checkpoint2.getPartitionSequenceNumberMap(), currentOffsets);
currentOffsets = ImmutableMap.copyOf(staleReplica.getRunner().getCurrentOffsets());
Assert.assertEquals(checkpoint2.getPartitionSequenceNumberMap(), currentOffsets);
normalReplica.getRunner().setEndOffsets(currentOffsets, true);
staleReplica.getRunner().setEndOffsets(currentOffsets, true);
Assert.assertEquals(TaskState.SUCCESS, normalReplicaFuture.get().getStatusCode());
Assert.assertEquals(TaskState.SUCCESS, staleReplicaFuture.get().getStatusCode());
Assert.assertEquals(9, normalReplica.getRunner().getRowIngestionMeters().getProcessed());
Assert.assertEquals(0, normalReplica.getRunner().getRowIngestionMeters().getUnparseable());
Assert.assertEquals(0, normalReplica.getRunner().getRowIngestionMeters().getThrownAway());
Assert.assertEquals(9, staleReplica.getRunner().getRowIngestionMeters().getProcessed());
Assert.assertEquals(0, staleReplica.getRunner().getRowIngestionMeters().getUnparseable());
Assert.assertEquals(0, staleReplica.getRunner().getRowIngestionMeters().getThrownAway());
}
@Test(timeout = 60_000L)

View File

@ -91,6 +91,7 @@ import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.concurrent.ListenableFutures;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.common.parsers.JSONPathSpec;
@ -133,8 +134,6 @@ import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
import org.apache.druid.segment.loading.DataSegmentPusher;
import org.apache.druid.segment.loading.LocalDataSegmentPusher;
import org.apache.druid.segment.loading.LocalDataSegmentPusherConfig;
import org.apache.druid.segment.loading.SegmentLoaderConfig;
import org.apache.druid.segment.loading.StorageLocationConfig;
import org.apache.druid.segment.realtime.appenderator.AppenderatorImpl;
import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider;
import org.apache.druid.segment.realtime.plumber.SegmentHandoffNotifier;
@ -2557,6 +2556,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport
final KinesisIndexTask task = createTask(
"task1",
DATA_SCHEMA,
new KinesisIndexTaskIOConfig(
null,
"sequence0",
@ -2616,62 +2616,54 @@ public class KinesisIndexTaskTest extends EasyMockSupport
final String baseSequenceName = "sequence0";
// as soon as any segment has more than one record, incremental publishing should happen
maxRowsPerSegment = 2;
maxRecordsPerPoll = 1;
recordSupplier.assign(anyObject());
final KinesisRecordSupplier recordSupplier1 = mock(KinesisRecordSupplier.class);
recordSupplier1.assign(anyObject());
expectLastCall().anyTimes();
expect(recordSupplier.getEarliestSequenceNumber(anyObject())).andReturn("0").anyTimes();
recordSupplier.seek(anyObject(), anyString());
expect(recordSupplier1.getEarliestSequenceNumber(anyObject())).andReturn("0").anyTimes();
recordSupplier1.seek(anyObject(), anyString());
expectLastCall().anyTimes();
expect(recordSupplier.poll(anyLong())).andReturn(records.subList(0, 5))
expect(recordSupplier1.poll(anyLong())).andReturn(records.subList(0, 5))
.once()
.andReturn(records.subList(4, 10))
.once()
.andReturn(records.subList(9, 15))
.once();
recordSupplier.close();
recordSupplier1.close();
expectLastCall().once();
final KinesisRecordSupplier recordSupplier2 = mock(KinesisRecordSupplier.class);
recordSupplier2.assign(anyObject());
expectLastCall().anyTimes();
expect(recordSupplier2.getEarliestSequenceNumber(anyObject())).andReturn("0").anyTimes();
recordSupplier2.seek(anyObject(), anyString());
expectLastCall().anyTimes();
expect(recordSupplier2.poll(anyLong())).andReturn(records.subList(0, 5))
.once()
.andReturn(records.subList(4, 10))
.once();
recordSupplier2.close();
expectLastCall().once();
replayAll();
final SeekableStreamPartitions<String, String> startPartitions = new SeekableStreamPartitions<>(
stream,
ImmutableMap.of(
shardId1,
"0"
)
ImmutableMap.of(shardId1, "0")
);
final SeekableStreamPartitions<String, String> checkpoint1 = new SeekableStreamPartitions<>(
stream,
ImmutableMap.of(
shardId1,
"4"
)
ImmutableMap.of(shardId1, "4")
);
final SeekableStreamPartitions<String, String> checkpoint2 = new SeekableStreamPartitions<>(
stream,
ImmutableMap.of(
shardId1,
"9"
)
ImmutableMap.of(shardId1, "9")
);
final SeekableStreamPartitions<String, String> endPartitions = new SeekableStreamPartitions<>(
stream,
ImmutableMap.of(
shardId1,
"14"
)
ImmutableMap.of(shardId1, "100") // simulating unlimited
);
final KinesisIndexTask task = createTask(
null,
new KinesisIndexTaskIOConfig(
final KinesisIndexTaskIOConfig ioConfig = new KinesisIndexTaskIOConfig(
null,
baseSequenceName,
startPartitions,
@ -2686,55 +2678,85 @@ public class KinesisIndexTaskTest extends EasyMockSupport
null,
null,
false
)
);
final ListenableFuture<TaskStatus> future = runTask(task);
while (task.getRunner().getStatus() != SeekableStreamIndexTaskRunner.Status.PAUSED) {
Thread.sleep(10);
}
Map<String, String> currentOffsets = ImmutableMap.copyOf(task.getRunner().getCurrentOffsets());
Assert.assertEquals(checkpoint1.getPartitionSequenceNumberMap(), currentOffsets);
task.getRunner().setEndOffsets(currentOffsets, false);
final KinesisIndexTask normalReplica = createTask(
null,
DATA_SCHEMA,
ioConfig,
null
);
((TestableKinesisIndexTask) normalReplica).setLocalSupplier(recordSupplier1);
final KinesisIndexTask staleReplica = createTask(
null,
DATA_SCHEMA,
ioConfig,
null
);
((TestableKinesisIndexTask) staleReplica).setLocalSupplier(recordSupplier2);
final ListenableFuture<TaskStatus> normalReplicaFuture = runTask(normalReplica);
// Simulating one replica is slower than the other
final ListenableFuture<TaskStatus> staleReplicaFuture = ListenableFutures.transformAsync(
taskExec.submit(() -> {
Thread.sleep(1000);
return staleReplica;
}),
this::runTask
);
// The task is supposed to consume remaining rows up to the offset of 13
while (task.getRunner().getStatus() != Status.PAUSED) {
while (normalReplica.getRunner().getStatus() != Status.PAUSED) {
Thread.sleep(10);
}
currentOffsets = ImmutableMap.copyOf(task.getRunner().getCurrentOffsets());
staleReplica.getRunner().pause();
while (staleReplica.getRunner().getStatus() != Status.PAUSED) {
Thread.sleep(10);
}
Map<String, String> currentOffsets = ImmutableMap.copyOf(normalReplica.getRunner().getCurrentOffsets());
Assert.assertEquals(checkpoint1.getPartitionSequenceNumberMap(), currentOffsets);
normalReplica.getRunner().setEndOffsets(currentOffsets, false);
staleReplica.getRunner().setEndOffsets(currentOffsets, false);
while (normalReplica.getRunner().getStatus() != Status.PAUSED) {
Thread.sleep(10);
}
while (staleReplica.getRunner().getStatus() != Status.PAUSED) {
Thread.sleep(10);
}
currentOffsets = ImmutableMap.copyOf(normalReplica.getRunner().getCurrentOffsets());
Assert.assertEquals(checkpoint2.getPartitionSequenceNumberMap(), currentOffsets);
currentOffsets = ImmutableMap.copyOf(staleReplica.getRunner().getCurrentOffsets());
Assert.assertEquals(checkpoint2.getPartitionSequenceNumberMap(), currentOffsets);
task.getRunner().setEndOffsets(
ImmutableMap.of(shardId1, String.valueOf(Long.valueOf(task.getRunner().getCurrentOffsets().get(shardId1)) + 1)),
true
);
normalReplica.getRunner().setEndOffsets(currentOffsets, true);
staleReplica.getRunner().setEndOffsets(currentOffsets, true);
Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode());
Assert.assertEquals(TaskState.SUCCESS, normalReplicaFuture.get().getStatusCode());
Assert.assertEquals(TaskState.SUCCESS, staleReplicaFuture.get().getStatusCode());
verifyAll();
Assert.assertEquals(2, checkpointRequestsHash.size());
// Check metrics
Assert.assertEquals(12, task.getRunner().getRowIngestionMeters().getProcessed());
Assert.assertEquals(0, task.getRunner().getRowIngestionMeters().getUnparseable());
Assert.assertEquals(0, task.getRunner().getRowIngestionMeters().getThrownAway());
Assert.assertEquals(10, normalReplica.getRunner().getRowIngestionMeters().getProcessed());
Assert.assertEquals(0, normalReplica.getRunner().getRowIngestionMeters().getUnparseable());
Assert.assertEquals(0, normalReplica.getRunner().getRowIngestionMeters().getThrownAway());
// Check published metadata
final Set<SegmentDescriptor> descriptors = new HashSet<>();
descriptors.add(sd(task, "2008/P1D", 0));
descriptors.add(sd(task, "2008/P1D", 1));
descriptors.add(sd(task, "2009/P1D", 0));
descriptors.add(sd(task, "2010/P1D", 0));
descriptors.add(sd(task, "2010/P1D", 1));
descriptors.add(sd(task, "2011/P1D", 0));
descriptors.add(sd(task, "2011/P1D", 1));
descriptors.add(sd(task, "2012/P1D", 0));
descriptors.add(sd(task, "2013/P1D", 0));
descriptors.add(sd(normalReplica, "2008/P1D", 0));
descriptors.add(sd(normalReplica, "2009/P1D", 0));
descriptors.add(sd(normalReplica, "2010/P1D", 0));
descriptors.add(sd(normalReplica, "2010/P1D", 1));
descriptors.add(sd(normalReplica, "2011/P1D", 0));
descriptors.add(sd(normalReplica, "2011/P1D", 1));
descriptors.add(sd(normalReplica, "2012/P1D", 0));
descriptors.add(sd(normalReplica, "2013/P1D", 0));
Assert.assertEquals(descriptors, publishedDescriptors());
Assert.assertEquals(
new KinesisDataSourceMetadata(new SeekableStreamPartitions<>(stream, ImmutableMap.of(
shardId1,
"10"
"9"
))),
metadataStorageCoordinator.getDataSourceMetadata(DATA_SCHEMA.getDataSource())
);
@ -2791,16 +2813,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport
final KinesisIndexTaskIOConfig ioConfig
)
{
return createTask(taskId, DATA_SCHEMA, ioConfig);
}
private KinesisIndexTask createTask(
final String taskId,
final KinesisIndexTaskIOConfig ioConfig,
final Map<String, Object> context
)
{
return createTask(taskId, DATA_SCHEMA, ioConfig, context);
return createTask(taskId, DATA_SCHEMA, ioConfig, null);
}
private KinesisIndexTask createTask(
@ -2809,55 +2822,14 @@ public class KinesisIndexTaskTest extends EasyMockSupport
final KinesisIndexTaskIOConfig ioConfig
)
{
final KinesisIndexTaskTuningConfig tuningConfig = new KinesisIndexTaskTuningConfig(
1000,
null,
maxRowsPerSegment,
maxTotalRows,
new Period("P1Y"),
null,
null,
null,
true,
reportParseExceptions,
handoffConditionTimeout,
resetOffsetAutomatically,
skipAvailabilityCheck,
null,
null,
null,
5000,
null,
null,
logParseExceptions,
maxParseExceptions,
maxSavedParseExceptions,
maxRecordsPerPoll,
intermediateHandoffPeriod
);
final Map<String, Object> context = null;
final KinesisIndexTask task = new TestableKinesisIndexTask(
taskId,
null,
cloneDataSchema(dataSchema),
tuningConfig,
ioConfig,
context,
null,
null,
rowIngestionMetersFactory,
null
);
return task;
return createTask(taskId, dataSchema, ioConfig, null);
}
private KinesisIndexTask createTask(
final String taskId,
final DataSchema dataSchema,
final KinesisIndexTaskIOConfig ioConfig,
final Map<String, Object> context
@Nullable final Map<String, Object> context
)
{
final KinesisIndexTaskTuningConfig tuningConfig = new KinesisIndexTaskTuningConfig(
@ -2886,7 +2858,20 @@ public class KinesisIndexTaskTest extends EasyMockSupport
maxRecordsPerPoll,
intermediateHandoffPeriod
);
return createTask(taskId, dataSchema, ioConfig, tuningConfig, context);
}
private KinesisIndexTask createTask(
final String taskId,
final DataSchema dataSchema,
final KinesisIndexTaskIOConfig ioConfig,
final KinesisIndexTaskTuningConfig tuningConfig,
@Nullable final Map<String, Object> context
)
{
if (context != null) {
context.put(SeekableStreamSupervisor.IS_INCREMENTAL_HANDOFF_SUPPORTED, true);
}
final KinesisIndexTask task = new TestableKinesisIndexTask(
taskId,
@ -3059,14 +3044,6 @@ public class KinesisIndexTaskTest extends EasyMockSupport
final LocalDataSegmentPusherConfig dataSegmentPusherConfig = new LocalDataSegmentPusherConfig();
dataSegmentPusherConfig.storageDirectory = getSegmentDirectory();
final DataSegmentPusher dataSegmentPusher = new LocalDataSegmentPusher(dataSegmentPusherConfig);
SegmentLoaderConfig segmentLoaderConfig = new SegmentLoaderConfig()
{
@Override
public List<StorageLocationConfig> getLocations()
{
return new ArrayList<>();
}
};
toolboxFactory = new TaskToolboxFactory(
taskConfig,
taskActionClientFactory,
@ -3242,8 +3219,10 @@ public class KinesisIndexTaskTest extends EasyMockSupport
@JsonTypeName("index_kinesis")
private static class TestableKinesisIndexTask extends KinesisIndexTask
{
private KinesisRecordSupplier localSupplier;
@JsonCreator
public TestableKinesisIndexTask(
private TestableKinesisIndexTask(
@JsonProperty("id") String id,
@JsonProperty("resource") TaskResource taskResource,
@JsonProperty("dataSchema") DataSchema dataSchema,
@ -3270,10 +3249,15 @@ public class KinesisIndexTaskTest extends EasyMockSupport
);
}
private void setLocalSupplier(KinesisRecordSupplier recordSupplier)
{
this.localSupplier = recordSupplier;
}
@Override
protected KinesisRecordSupplier newTaskRecordSupplier()
{
return recordSupplier;
return localSupplier == null ? recordSupplier : localSupplier;
}
}