From 8b78eebdbdab87cd663f3ddf5c5fb7f6e9bcdc34 Mon Sep 17 00:00:00 2001 From: Maytas Monsereenusorn <52679095+maytasm@users.noreply.github.com> Date: Mon, 27 Apr 2020 07:23:56 -1000 Subject: [PATCH] Test reading from empty kafka/kinesis partitions (#9729) * add test for stream sequence number returns null * fix checkstyle * add index test for when stream returns null * retrigger test --- .../indexing/kafka/KafkaIndexTaskTest.java | 42 +++++++++++++ .../kafka/KafkaRecordSupplierTest.java | 48 ++++++++++++++ .../kinesis/KinesisRecordSupplier.java | 8 +-- .../kinesis/KinesisIndexTaskTest.java | 62 +++++++++++++++++++ .../kinesis/KinesisRecordSupplierTest.java | 53 ++++++++++++++++ .../SeekableStreamSupervisorStateTest.java | 53 ++++++++++++++++ 6 files changed, 262 insertions(+), 4 deletions(-) diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java index d4468bd39e0..eaa2f99a1a5 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java @@ -136,6 +136,7 @@ import javax.annotation.Nullable; import java.io.File; import java.io.IOException; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -2365,6 +2366,47 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode()); } + @Test(timeout = 60_000L) + public void testRunWithoutDataInserted() throws Exception + { + final KafkaIndexTask task = createTask( + null, + new KafkaIndexTaskIOConfig( + 0, + "sequence0", + new SeekableStreamStartSequenceNumbers<>(topic, ImmutableMap.of(0, 2L), ImmutableSet.of()), + new SeekableStreamEndSequenceNumbers<>(topic, ImmutableMap.of(0, 5L)), + kafkaServer.consumerProperties(), + KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS, + true, + null, + null, + INPUT_FORMAT + ) + ); + + final ListenableFuture future = runTask(task); + + Thread.sleep(1000); + + Assert.assertEquals(0, countEvents(task)); + Assert.assertEquals(SeekableStreamIndexTaskRunner.Status.READING, task.getRunner().getStatus()); + + task.getRunner().stopGracefully(); + + // Wait for task to exit + Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode()); + + // Check metrics + Assert.assertEquals(0, task.getRunner().getRowIngestionMeters().getProcessed()); + Assert.assertEquals(0, task.getRunner().getRowIngestionMeters().getUnparseable()); + Assert.assertEquals(0, task.getRunner().getRowIngestionMeters().getThrownAway()); + + // Check published metadata and segments in deep storage + assertEqualsExceptVersion(Collections.emptyList(), publishedDescriptors()); + Assert.assertNull(newDataSchemaMetadata()); + } + @Test public void testSerde() throws Exception { diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaRecordSupplierTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaRecordSupplierTest.java index c4739abc4de..d1580335232 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaRecordSupplierTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaRecordSupplierTest.java @@ -534,6 +534,54 @@ public class KafkaRecordSupplierTest recordSupplier.close(); } + @Test + public void getLatestSequenceNumberWhenPartitionIsEmptyAndUseEarliestOffsetShouldReturnsValidNonNull() + { + KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier( + kafkaServer.consumerProperties(), OBJECT_MAPPER); + StreamPartition streamPartition = StreamPartition.of(topic, 0); + Set> partitions = ImmutableSet.of(streamPartition); + recordSupplier.assign(partitions); + recordSupplier.seekToEarliest(partitions); + Assert.assertEquals(new Long(0), recordSupplier.getLatestSequenceNumber(streamPartition)); + } + + @Test + public void getEarliestSequenceNumberWhenPartitionIsEmptyAndUseEarliestOffsetShouldReturnsValidNonNull() + { + KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier( + kafkaServer.consumerProperties(), OBJECT_MAPPER); + StreamPartition streamPartition = StreamPartition.of(topic, 0); + Set> partitions = ImmutableSet.of(streamPartition); + recordSupplier.assign(partitions); + recordSupplier.seekToEarliest(partitions); + Assert.assertEquals(new Long(0), recordSupplier.getEarliestSequenceNumber(streamPartition)); + } + + @Test + public void getLatestSequenceNumberWhenPartitionIsEmptyAndUseLatestOffsetShouldReturnsValidNonNull() + { + KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier( + kafkaServer.consumerProperties(), OBJECT_MAPPER); + StreamPartition streamPartition = StreamPartition.of(topic, 0); + Set> partitions = ImmutableSet.of(streamPartition); + recordSupplier.assign(partitions); + recordSupplier.seekToLatest(partitions); + Assert.assertEquals(new Long(0), recordSupplier.getLatestSequenceNumber(streamPartition)); + } + + @Test + public void getEarliestSequenceNumberWhenPartitionIsEmptyAndUseLatestOffsetShouldReturnsValidNonNull() + { + KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier( + kafkaServer.consumerProperties(), OBJECT_MAPPER); + StreamPartition streamPartition = StreamPartition.of(topic, 0); + Set> partitions = ImmutableSet.of(streamPartition); + recordSupplier.assign(partitions); + recordSupplier.seekToLatest(partitions); + Assert.assertEquals(new Long(0), recordSupplier.getEarliestSequenceNumber(streamPartition)); + } + private void insertData() throws ExecutionException, InterruptedException { try (final KafkaProducer kafkaProducer = kafkaServer.newProducer()) { diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java index e7ec59a7344..f4b78304d17 100644 --- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java +++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java @@ -780,6 +780,7 @@ public class KinesisRecordSupplier implements RecordSupplier private String getSequenceNumberInternal(StreamPartition partition, String shardIterator) { long timeoutMillis = System.currentTimeMillis() + fetchSequenceNumberTimeout; + GetRecordsResult recordsResult = null; while (shardIterator != null && System.currentTimeMillis() < timeoutMillis) { @@ -787,8 +788,6 @@ public class KinesisRecordSupplier implements RecordSupplier log.info("KinesisRecordSupplier closed while fetching sequenceNumber"); return null; } - - GetRecordsResult recordsResult; try { // we call getRecords with limit 1000 to make sure that we can find the first (earliest) record in the shard. // In the case where the shard is constantly removing records that are past their retention period, it is possible @@ -832,8 +831,9 @@ public class KinesisRecordSupplier implements RecordSupplier // if we reach here, it usually means either the shard has no more records, or records have not been // added to this shard log.warn( - "timed out while trying to fetch position for shard[%s], likely no more records in shard", - partition.getPartitionId() + "timed out while trying to fetch position for shard[%s], millisBehindLatest is [%s], likely no more records in shard", + partition.getPartitionId(), + recordsResult != null ? recordsResult.getMillisBehindLatest() : "UNKNOWN" ); return null; diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java index 03302c0538a..c886065955d 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java @@ -2654,6 +2654,68 @@ public class KinesisIndexTaskTest extends SeekableStreamIndexTaskTestBase ); } + @Test(timeout = 60_000L) + public void testRunWithoutDataInserted() throws Exception + { + recordSupplier.assign(EasyMock.anyObject()); + EasyMock.expectLastCall().anyTimes(); + + EasyMock.expect(recordSupplier.getEarliestSequenceNumber(EasyMock.anyObject())).andReturn("0").anyTimes(); + + recordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString()); + EasyMock.expectLastCall().anyTimes(); + + EasyMock.expect(recordSupplier.poll(EasyMock.anyLong())).andReturn(Collections.emptyList()).times(1, Integer.MAX_VALUE); + + recordSupplier.close(); + EasyMock.expectLastCall().once(); + + replayAll(); + + final KinesisIndexTask task = createTask( + null, + new KinesisIndexTaskIOConfig( + 0, + "sequence0", + new SeekableStreamStartSequenceNumbers<>(STREAM, ImmutableMap.of(SHARD_ID0, "0"), ImmutableSet.of()), + new SeekableStreamEndSequenceNumbers<>(STREAM, ImmutableMap.of(SHARD_ID0, "1")), + true, + null, + null, + INPUT_FORMAT, + "awsEndpoint", + null, + null, + null, + null, + false + ) + + ); + + final ListenableFuture future = runTask(task); + + Thread.sleep(1000); + + Assert.assertEquals(0, countEvents(task)); + Assert.assertEquals(SeekableStreamIndexTaskRunner.Status.READING, task.getRunner().getStatus()); + + task.getRunner().stopGracefully(); + + // Wait for task to exit + Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode()); + + verifyAll(); + // Check metrics + Assert.assertEquals(0, task.getRunner().getRowIngestionMeters().getProcessed()); + Assert.assertEquals(0, task.getRunner().getRowIngestionMeters().getUnparseable()); + Assert.assertEquals(0, task.getRunner().getRowIngestionMeters().getThrownAway()); + + // Check published metadata and segments in deep storage + assertEqualsExceptVersion(Collections.emptyList(), publishedDescriptors()); + Assert.assertNull(newDataSchemaMetadata()); + } + private KinesisIndexTask createTask( final String taskId, final KinesisIndexTaskIOConfig ioConfig diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplierTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplierTest.java index 76de9a95139..4a631e84fa6 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplierTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplierTest.java @@ -643,6 +643,59 @@ public class KinesisRecordSupplierTest extends EasyMockSupport Assert.assertEquals(SHARDS_LAG_MILLIS, recordSupplier.getPartitionTimeLag()); } + @Test + public void getLatestSequenceNumberWhenShardIsEmptyShouldReturnsNull() + { + + KinesisRecordSupplier recordSupplier = getSequenceNumberWhenShardIsEmptyShouldReturnsNullHelper(); + Assert.assertNull(recordSupplier.getLatestSequenceNumber(StreamPartition.of(STREAM, SHARD_ID0))); + verifyAll(); + } + + @Test + public void getEarliestSequenceNumberWhenShardIsEmptyShouldReturnsNull() + { + + KinesisRecordSupplier recordSupplier = getSequenceNumberWhenShardIsEmptyShouldReturnsNullHelper(); + Assert.assertNull(recordSupplier.getEarliestSequenceNumber(StreamPartition.of(STREAM, SHARD_ID0))); + verifyAll(); + } + + private KinesisRecordSupplier getSequenceNumberWhenShardIsEmptyShouldReturnsNullHelper() + { + EasyMock.expect(kinesis.getShardIterator( + EasyMock.eq(STREAM), + EasyMock.eq(SHARD_ID0), + EasyMock.anyString() + )).andReturn( + getShardIteratorResult0).anyTimes(); + + EasyMock.expect(getShardIteratorResult0.getShardIterator()).andReturn(SHARD0_ITERATOR).anyTimes(); + + EasyMock.expect(kinesis.getRecords(generateGetRecordsReq(SHARD0_ITERATOR, 1000))) + .andReturn(getRecordsResult0) + .times(1, Integer.MAX_VALUE); + + EasyMock.expect(getRecordsResult0.getRecords()).andReturn(Collections.emptyList()).times(1, Integer.MAX_VALUE); + EasyMock.expect(getRecordsResult0.getNextShardIterator()).andReturn(SHARD0_ITERATOR).times(1, Integer.MAX_VALUE); + EasyMock.expect(getRecordsResult0.getMillisBehindLatest()).andReturn(0L).once(); + + replayAll(); + + recordSupplier = new KinesisRecordSupplier( + kinesis, + recordsPerFetch, + 0, + 2, + true, + 100, + 5000, + 5000, + 1000, + 100 + ); + return recordSupplier; + } @Test public void getPartitionTimeLag() throws InterruptedException diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java index 2e5d74af439..d34bbbbec82 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java @@ -58,6 +58,7 @@ import org.apache.druid.indexing.seekablestream.common.StreamException; import org.apache.druid.indexing.seekablestream.common.StreamPartition; import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorStateManager.SeekableStreamExceptionEvent; import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorStateManager.SeekableStreamState; +import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.java.util.common.parsers.JSONPathSpec; @@ -202,6 +203,58 @@ public class SeekableStreamSupervisorStateTest extends EasyMockSupport verifyAll(); } + @Test + public void testRunningStreamGetSequenceNumberReturnsNull() throws Exception + { + EasyMock.reset(recordSupplier); + EasyMock.expect(recordSupplier.getAssignment()).andReturn(ImmutableSet.of(SHARD0_PARTITION)).anyTimes(); + EasyMock.expect(recordSupplier.getLatestSequenceNumber(EasyMock.anyObject())).andReturn(null).anyTimes(); + EasyMock.expect(spec.isSuspended()).andReturn(false).anyTimes(); + EasyMock.expect(recordSupplier.getPartitionIds(STREAM)).andReturn(ImmutableSet.of(SHARD_ID)).anyTimes(); + EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes(); + EasyMock.expect(taskQueue.add(EasyMock.anyObject())).andReturn(true).anyTimes(); + + replayAll(); + + SeekableStreamSupervisor supervisor = new TestSeekableStreamSupervisor(); + + supervisor.start(); + + Assert.assertTrue(supervisor.stateManager.isHealthy()); + Assert.assertEquals(BasicState.PENDING, supervisor.stateManager.getSupervisorState()); + Assert.assertEquals(BasicState.PENDING, supervisor.stateManager.getSupervisorState().getBasicState()); + Assert.assertTrue(supervisor.stateManager.getExceptionEvents().isEmpty()); + Assert.assertFalse(supervisor.stateManager.isAtLeastOneSuccessfulRun()); + + supervisor.runInternal(); + + Assert.assertTrue(supervisor.stateManager.isHealthy()); + Assert.assertEquals(SeekableStreamState.CREATING_TASKS, supervisor.stateManager.getSupervisorState()); + Assert.assertEquals(BasicState.RUNNING, supervisor.stateManager.getSupervisorState().getBasicState()); + List exceptionEvents = supervisor.stateManager.getExceptionEvents(); + Assert.assertEquals(1, exceptionEvents.size()); + Assert.assertFalse(((SeekableStreamExceptionEvent) exceptionEvents.get(0)).isStreamException()); + Assert.assertEquals(ISE.class.getName(), exceptionEvents.get(0).getExceptionClass()); + Assert.assertEquals(StringUtils.format("unable to fetch sequence number for partition[%s] from stream", SHARD_ID), exceptionEvents.get(0).getMessage()); + Assert.assertFalse(supervisor.stateManager.isAtLeastOneSuccessfulRun()); + + supervisor.runInternal(); + Assert.assertTrue(supervisor.stateManager.isHealthy()); + Assert.assertEquals(SeekableStreamState.CREATING_TASKS, supervisor.stateManager.getSupervisorState()); + Assert.assertEquals(BasicState.RUNNING, supervisor.stateManager.getSupervisorState().getBasicState()); + Assert.assertEquals(2, supervisor.stateManager.getExceptionEvents().size()); + Assert.assertFalse(supervisor.stateManager.isAtLeastOneSuccessfulRun()); + + supervisor.runInternal(); + Assert.assertFalse(supervisor.stateManager.isHealthy()); + Assert.assertEquals(BasicState.UNHEALTHY_SUPERVISOR, supervisor.stateManager.getSupervisorState()); + Assert.assertEquals(BasicState.UNHEALTHY_SUPERVISOR, supervisor.stateManager.getSupervisorState().getBasicState()); + Assert.assertEquals(3, supervisor.stateManager.getExceptionEvents().size()); + Assert.assertFalse(supervisor.stateManager.isAtLeastOneSuccessfulRun()); + + verifyAll(); + } + @Test public void testConnectingToStreamFail() throws Exception {