Test reading from empty kafka/kinesis partitions (#9729)

* add test for stream sequence number returns null

* fix checkstyle

* add index test for when stream returns null

* retrigger test
This commit is contained in:
Maytas Monsereenusorn 2020-04-27 07:23:56 -10:00 committed by GitHub
parent fe000a9e4b
commit 8b78eebdbd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 262 additions and 4 deletions

View File

@ -136,6 +136,7 @@ import javax.annotation.Nullable;
import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@ -2365,6 +2366,47 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase
Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode());
}
@Test(timeout = 60_000L)
public void testRunWithoutDataInserted() throws Exception
{
final KafkaIndexTask task = createTask(
null,
new KafkaIndexTaskIOConfig(
0,
"sequence0",
new SeekableStreamStartSequenceNumbers<>(topic, ImmutableMap.of(0, 2L), ImmutableSet.of()),
new SeekableStreamEndSequenceNumbers<>(topic, ImmutableMap.of(0, 5L)),
kafkaServer.consumerProperties(),
KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
true,
null,
null,
INPUT_FORMAT
)
);
final ListenableFuture<TaskStatus> future = runTask(task);
Thread.sleep(1000);
Assert.assertEquals(0, countEvents(task));
Assert.assertEquals(SeekableStreamIndexTaskRunner.Status.READING, task.getRunner().getStatus());
task.getRunner().stopGracefully();
// Wait for task to exit
Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode());
// Check metrics
Assert.assertEquals(0, task.getRunner().getRowIngestionMeters().getProcessed());
Assert.assertEquals(0, task.getRunner().getRowIngestionMeters().getUnparseable());
Assert.assertEquals(0, task.getRunner().getRowIngestionMeters().getThrownAway());
// Check published metadata and segments in deep storage
assertEqualsExceptVersion(Collections.emptyList(), publishedDescriptors());
Assert.assertNull(newDataSchemaMetadata());
}
@Test
public void testSerde() throws Exception
{

View File

@ -534,6 +534,54 @@ public class KafkaRecordSupplierTest
recordSupplier.close();
}
@Test
public void getLatestSequenceNumberWhenPartitionIsEmptyAndUseEarliestOffsetShouldReturnsValidNonNull()
{
KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier(
kafkaServer.consumerProperties(), OBJECT_MAPPER);
StreamPartition<Integer> streamPartition = StreamPartition.of(topic, 0);
Set<StreamPartition<Integer>> partitions = ImmutableSet.of(streamPartition);
recordSupplier.assign(partitions);
recordSupplier.seekToEarliest(partitions);
Assert.assertEquals(new Long(0), recordSupplier.getLatestSequenceNumber(streamPartition));
}
@Test
public void getEarliestSequenceNumberWhenPartitionIsEmptyAndUseEarliestOffsetShouldReturnsValidNonNull()
{
KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier(
kafkaServer.consumerProperties(), OBJECT_MAPPER);
StreamPartition<Integer> streamPartition = StreamPartition.of(topic, 0);
Set<StreamPartition<Integer>> partitions = ImmutableSet.of(streamPartition);
recordSupplier.assign(partitions);
recordSupplier.seekToEarliest(partitions);
Assert.assertEquals(new Long(0), recordSupplier.getEarliestSequenceNumber(streamPartition));
}
@Test
public void getLatestSequenceNumberWhenPartitionIsEmptyAndUseLatestOffsetShouldReturnsValidNonNull()
{
KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier(
kafkaServer.consumerProperties(), OBJECT_MAPPER);
StreamPartition<Integer> streamPartition = StreamPartition.of(topic, 0);
Set<StreamPartition<Integer>> partitions = ImmutableSet.of(streamPartition);
recordSupplier.assign(partitions);
recordSupplier.seekToLatest(partitions);
Assert.assertEquals(new Long(0), recordSupplier.getLatestSequenceNumber(streamPartition));
}
@Test
public void getEarliestSequenceNumberWhenPartitionIsEmptyAndUseLatestOffsetShouldReturnsValidNonNull()
{
KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier(
kafkaServer.consumerProperties(), OBJECT_MAPPER);
StreamPartition<Integer> streamPartition = StreamPartition.of(topic, 0);
Set<StreamPartition<Integer>> partitions = ImmutableSet.of(streamPartition);
recordSupplier.assign(partitions);
recordSupplier.seekToLatest(partitions);
Assert.assertEquals(new Long(0), recordSupplier.getEarliestSequenceNumber(streamPartition));
}
private void insertData() throws ExecutionException, InterruptedException
{
try (final KafkaProducer<byte[], byte[]> kafkaProducer = kafkaServer.newProducer()) {

View File

@ -780,6 +780,7 @@ public class KinesisRecordSupplier implements RecordSupplier<String, String>
private String getSequenceNumberInternal(StreamPartition<String> partition, String shardIterator)
{
long timeoutMillis = System.currentTimeMillis() + fetchSequenceNumberTimeout;
GetRecordsResult recordsResult = null;
while (shardIterator != null && System.currentTimeMillis() < timeoutMillis) {
@ -787,8 +788,6 @@ public class KinesisRecordSupplier implements RecordSupplier<String, String>
log.info("KinesisRecordSupplier closed while fetching sequenceNumber");
return null;
}
GetRecordsResult recordsResult;
try {
// we call getRecords with limit 1000 to make sure that we can find the first (earliest) record in the shard.
// In the case where the shard is constantly removing records that are past their retention period, it is possible
@ -832,8 +831,9 @@ public class KinesisRecordSupplier implements RecordSupplier<String, String>
// if we reach here, it usually means either the shard has no more records, or records have not been
// added to this shard
log.warn(
"timed out while trying to fetch position for shard[%s], likely no more records in shard",
partition.getPartitionId()
"timed out while trying to fetch position for shard[%s], millisBehindLatest is [%s], likely no more records in shard",
partition.getPartitionId(),
recordsResult != null ? recordsResult.getMillisBehindLatest() : "UNKNOWN"
);
return null;

View File

@ -2654,6 +2654,68 @@ public class KinesisIndexTaskTest extends SeekableStreamIndexTaskTestBase
);
}
@Test(timeout = 60_000L)
public void testRunWithoutDataInserted() throws Exception
{
recordSupplier.assign(EasyMock.anyObject());
EasyMock.expectLastCall().anyTimes();
EasyMock.expect(recordSupplier.getEarliestSequenceNumber(EasyMock.anyObject())).andReturn("0").anyTimes();
recordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString());
EasyMock.expectLastCall().anyTimes();
EasyMock.expect(recordSupplier.poll(EasyMock.anyLong())).andReturn(Collections.emptyList()).times(1, Integer.MAX_VALUE);
recordSupplier.close();
EasyMock.expectLastCall().once();
replayAll();
final KinesisIndexTask task = createTask(
null,
new KinesisIndexTaskIOConfig(
0,
"sequence0",
new SeekableStreamStartSequenceNumbers<>(STREAM, ImmutableMap.of(SHARD_ID0, "0"), ImmutableSet.of()),
new SeekableStreamEndSequenceNumbers<>(STREAM, ImmutableMap.of(SHARD_ID0, "1")),
true,
null,
null,
INPUT_FORMAT,
"awsEndpoint",
null,
null,
null,
null,
false
)
);
final ListenableFuture<TaskStatus> future = runTask(task);
Thread.sleep(1000);
Assert.assertEquals(0, countEvents(task));
Assert.assertEquals(SeekableStreamIndexTaskRunner.Status.READING, task.getRunner().getStatus());
task.getRunner().stopGracefully();
// Wait for task to exit
Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode());
verifyAll();
// Check metrics
Assert.assertEquals(0, task.getRunner().getRowIngestionMeters().getProcessed());
Assert.assertEquals(0, task.getRunner().getRowIngestionMeters().getUnparseable());
Assert.assertEquals(0, task.getRunner().getRowIngestionMeters().getThrownAway());
// Check published metadata and segments in deep storage
assertEqualsExceptVersion(Collections.emptyList(), publishedDescriptors());
Assert.assertNull(newDataSchemaMetadata());
}
private KinesisIndexTask createTask(
final String taskId,
final KinesisIndexTaskIOConfig ioConfig

View File

@ -643,6 +643,59 @@ public class KinesisRecordSupplierTest extends EasyMockSupport
Assert.assertEquals(SHARDS_LAG_MILLIS, recordSupplier.getPartitionTimeLag());
}
@Test
public void getLatestSequenceNumberWhenShardIsEmptyShouldReturnsNull()
{
KinesisRecordSupplier recordSupplier = getSequenceNumberWhenShardIsEmptyShouldReturnsNullHelper();
Assert.assertNull(recordSupplier.getLatestSequenceNumber(StreamPartition.of(STREAM, SHARD_ID0)));
verifyAll();
}
@Test
public void getEarliestSequenceNumberWhenShardIsEmptyShouldReturnsNull()
{
KinesisRecordSupplier recordSupplier = getSequenceNumberWhenShardIsEmptyShouldReturnsNullHelper();
Assert.assertNull(recordSupplier.getEarliestSequenceNumber(StreamPartition.of(STREAM, SHARD_ID0)));
verifyAll();
}
private KinesisRecordSupplier getSequenceNumberWhenShardIsEmptyShouldReturnsNullHelper()
{
EasyMock.expect(kinesis.getShardIterator(
EasyMock.eq(STREAM),
EasyMock.eq(SHARD_ID0),
EasyMock.anyString()
)).andReturn(
getShardIteratorResult0).anyTimes();
EasyMock.expect(getShardIteratorResult0.getShardIterator()).andReturn(SHARD0_ITERATOR).anyTimes();
EasyMock.expect(kinesis.getRecords(generateGetRecordsReq(SHARD0_ITERATOR, 1000)))
.andReturn(getRecordsResult0)
.times(1, Integer.MAX_VALUE);
EasyMock.expect(getRecordsResult0.getRecords()).andReturn(Collections.emptyList()).times(1, Integer.MAX_VALUE);
EasyMock.expect(getRecordsResult0.getNextShardIterator()).andReturn(SHARD0_ITERATOR).times(1, Integer.MAX_VALUE);
EasyMock.expect(getRecordsResult0.getMillisBehindLatest()).andReturn(0L).once();
replayAll();
recordSupplier = new KinesisRecordSupplier(
kinesis,
recordsPerFetch,
0,
2,
true,
100,
5000,
5000,
1000,
100
);
return recordSupplier;
}
@Test
public void getPartitionTimeLag() throws InterruptedException

View File

@ -58,6 +58,7 @@ import org.apache.druid.indexing.seekablestream.common.StreamException;
import org.apache.druid.indexing.seekablestream.common.StreamPartition;
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorStateManager.SeekableStreamExceptionEvent;
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorStateManager.SeekableStreamState;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.parsers.JSONPathSpec;
@ -202,6 +203,58 @@ public class SeekableStreamSupervisorStateTest extends EasyMockSupport
verifyAll();
}
@Test
public void testRunningStreamGetSequenceNumberReturnsNull() throws Exception
{
EasyMock.reset(recordSupplier);
EasyMock.expect(recordSupplier.getAssignment()).andReturn(ImmutableSet.of(SHARD0_PARTITION)).anyTimes();
EasyMock.expect(recordSupplier.getLatestSequenceNumber(EasyMock.anyObject())).andReturn(null).anyTimes();
EasyMock.expect(spec.isSuspended()).andReturn(false).anyTimes();
EasyMock.expect(recordSupplier.getPartitionIds(STREAM)).andReturn(ImmutableSet.of(SHARD_ID)).anyTimes();
EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes();
EasyMock.expect(taskQueue.add(EasyMock.anyObject())).andReturn(true).anyTimes();
replayAll();
SeekableStreamSupervisor supervisor = new TestSeekableStreamSupervisor();
supervisor.start();
Assert.assertTrue(supervisor.stateManager.isHealthy());
Assert.assertEquals(BasicState.PENDING, supervisor.stateManager.getSupervisorState());
Assert.assertEquals(BasicState.PENDING, supervisor.stateManager.getSupervisorState().getBasicState());
Assert.assertTrue(supervisor.stateManager.getExceptionEvents().isEmpty());
Assert.assertFalse(supervisor.stateManager.isAtLeastOneSuccessfulRun());
supervisor.runInternal();
Assert.assertTrue(supervisor.stateManager.isHealthy());
Assert.assertEquals(SeekableStreamState.CREATING_TASKS, supervisor.stateManager.getSupervisorState());
Assert.assertEquals(BasicState.RUNNING, supervisor.stateManager.getSupervisorState().getBasicState());
List<SupervisorStateManager.ExceptionEvent> exceptionEvents = supervisor.stateManager.getExceptionEvents();
Assert.assertEquals(1, exceptionEvents.size());
Assert.assertFalse(((SeekableStreamExceptionEvent) exceptionEvents.get(0)).isStreamException());
Assert.assertEquals(ISE.class.getName(), exceptionEvents.get(0).getExceptionClass());
Assert.assertEquals(StringUtils.format("unable to fetch sequence number for partition[%s] from stream", SHARD_ID), exceptionEvents.get(0).getMessage());
Assert.assertFalse(supervisor.stateManager.isAtLeastOneSuccessfulRun());
supervisor.runInternal();
Assert.assertTrue(supervisor.stateManager.isHealthy());
Assert.assertEquals(SeekableStreamState.CREATING_TASKS, supervisor.stateManager.getSupervisorState());
Assert.assertEquals(BasicState.RUNNING, supervisor.stateManager.getSupervisorState().getBasicState());
Assert.assertEquals(2, supervisor.stateManager.getExceptionEvents().size());
Assert.assertFalse(supervisor.stateManager.isAtLeastOneSuccessfulRun());
supervisor.runInternal();
Assert.assertFalse(supervisor.stateManager.isHealthy());
Assert.assertEquals(BasicState.UNHEALTHY_SUPERVISOR, supervisor.stateManager.getSupervisorState());
Assert.assertEquals(BasicState.UNHEALTHY_SUPERVISOR, supervisor.stateManager.getSupervisorState().getBasicState());
Assert.assertEquals(3, supervisor.stateManager.getExceptionEvents().size());
Assert.assertFalse(supervisor.stateManager.isAtLeastOneSuccessfulRun());
verifyAll();
}
@Test
public void testConnectingToStreamFail() throws Exception
{