diff --git a/docs/content/development/extensions-core/kafka-ingestion.md b/docs/content/development/extensions-core/kafka-ingestion.md index 5cb3232cf88..ca16a5f0f85 100644 --- a/docs/content/development/extensions-core/kafka-ingestion.md +++ b/docs/content/development/extensions-core/kafka-ingestion.md @@ -168,6 +168,7 @@ For Roaring bitmaps: |`useEarliestOffset`|Boolean|If a supervisor is managing a dataSource for the first time, it will obtain a set of starting offsets from Kafka. This flag determines whether it retrieves the earliest or latest offsets in Kafka. Under normal circumstances, subsequent tasks will start from where the previous segments ended so this flag will only be used on first run.|no (default == false)| |`completionTimeout`|ISO8601 Period|The length of time to wait before declaring a publishing task as failed and terminating it. If this is set too low, your tasks may never publish. The publishing clock for a task begins roughly after `taskDuration` elapses.|no (default == PT30M)| |`lateMessageRejectionPeriod`|ISO8601 Period|Configure tasks to reject messages with timestamps earlier than this period before the task was created; for example if this is set to `PT1H` and the supervisor creates a task at *2016-01-01T12:00Z*, messages with timestamps earlier than *2016-01-01T11:00Z* will be dropped. This may help prevent concurrency issues if your data stream has late messages and you have multiple pipelines that need to operate on the same segments (e.g. a realtime and a nightly batch ingestion pipeline).|no (default == none)| +|`skipOffsetGaps`|Boolean|Whether or not to allow gaps of missing offsets in the Kafka stream. This is required for compatibility with implementations such as MapR Streams which does not guarantee consecutive offsets. If this is false, an exception will be thrown if offsets are not consecutive.|no (default == false)| ## Supervisor API diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIOConfig.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIOConfig.java index 55e5cdb7876..f94b18103a1 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIOConfig.java +++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIOConfig.java @@ -32,6 +32,7 @@ public class KafkaIOConfig implements IOConfig { private static final boolean DEFAULT_USE_TRANSACTION = true; private static final boolean DEFAULT_PAUSE_AFTER_READ = false; + private static final boolean DEFAULT_SKIP_OFFSET_GAPS = false; private final String baseSequenceName; private final KafkaPartitions startPartitions; @@ -40,6 +41,7 @@ public class KafkaIOConfig implements IOConfig private final boolean useTransaction; private final boolean pauseAfterRead; private final Optional minimumMessageTime; + private final boolean skipOffsetGaps; @JsonCreator public KafkaIOConfig( @@ -49,7 +51,8 @@ public class KafkaIOConfig implements IOConfig @JsonProperty("consumerProperties") Map consumerProperties, @JsonProperty("useTransaction") Boolean useTransaction, @JsonProperty("pauseAfterRead") Boolean pauseAfterRead, - @JsonProperty("minimumMessageTime") DateTime minimumMessageTime + @JsonProperty("minimumMessageTime") DateTime minimumMessageTime, + @JsonProperty("skipOffsetGaps") Boolean skipOffsetGaps ) { this.baseSequenceName = Preconditions.checkNotNull(baseSequenceName, "baseSequenceName"); @@ -59,6 +62,7 @@ public class KafkaIOConfig implements IOConfig this.useTransaction = useTransaction != null ? useTransaction : DEFAULT_USE_TRANSACTION; this.pauseAfterRead = pauseAfterRead != null ? pauseAfterRead : DEFAULT_PAUSE_AFTER_READ; this.minimumMessageTime = Optional.fromNullable(minimumMessageTime); + this.skipOffsetGaps = skipOffsetGaps != null ? skipOffsetGaps : DEFAULT_SKIP_OFFSET_GAPS; Preconditions.checkArgument( startPartitions.getTopic().equals(endPartitions.getTopic()), @@ -122,6 +126,12 @@ public class KafkaIOConfig implements IOConfig return minimumMessageTime; } + @JsonProperty + public boolean isSkipOffsetGaps() + { + return skipOffsetGaps; + } + @Override public String toString() { @@ -133,6 +143,7 @@ public class KafkaIOConfig implements IOConfig ", useTransaction=" + useTransaction + ", pauseAfterRead=" + pauseAfterRead + ", minimumMessageTime=" + minimumMessageTime + + ", skipOffsetGaps=" + skipOffsetGaps + '}'; } } diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java index a172530fbf0..0d199a83e06 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java +++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java @@ -405,12 +405,21 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler if (record.offset() < endOffsets.get(record.partition())) { if (record.offset() != nextOffsets.get(record.partition())) { - throw new ISE( - "WTF?! Got offset[%,d] after offset[%,d] in partition[%d].", - record.offset(), - nextOffsets.get(record.partition()), - record.partition() - ); + if (ioConfig.isSkipOffsetGaps()) { + log.warn( + "Skipped to offset[%,d] after offset[%,d] in partition[%d].", + record.offset(), + nextOffsets.get(record.partition()), + record.partition() + ); + } else { + throw new ISE( + "WTF?! Got offset[%,d] after offset[%,d] in partition[%d].", + record.offset(), + nextOffsets.get(record.partition()), + record.partition() + ); + } } try { diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java index 551a28af6c6..3634ece1faf 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java +++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java @@ -1401,7 +1401,8 @@ public class KafkaSupervisor implements Supervisor consumerProperties, true, false, - minimumMessageTime + minimumMessageTime, + ioConfig.isSkipOffsetGaps() ); for (int i = 0; i < replicas; i++) { diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfig.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfig.java index 516d9493c1e..395531b59c3 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfig.java +++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfig.java @@ -39,9 +39,10 @@ public class KafkaSupervisorIOConfig private final Map consumerProperties; private final Duration startDelay; private final Duration period; - private final Boolean useEarliestOffset; + private final boolean useEarliestOffset; private final Duration completionTimeout; private final Optional lateMessageRejectionPeriod; + private final boolean skipOffsetGaps; @JsonCreator public KafkaSupervisorIOConfig( @@ -54,7 +55,8 @@ public class KafkaSupervisorIOConfig @JsonProperty("period") Period period, @JsonProperty("useEarliestOffset") Boolean useEarliestOffset, @JsonProperty("completionTimeout") Period completionTimeout, - @JsonProperty("lateMessageRejectionPeriod") Period lateMessageRejectionPeriod + @JsonProperty("lateMessageRejectionPeriod") Period lateMessageRejectionPeriod, + @JsonProperty("skipOffsetGaps") Boolean skipOffsetGaps ) { this.topic = Preconditions.checkNotNull(topic, "topic"); @@ -64,16 +66,17 @@ public class KafkaSupervisorIOConfig String.format("consumerProperties must contain entry for [%s]", BOOTSTRAP_SERVERS_KEY) ); - this.replicas = (replicas != null ? replicas : 1); - this.taskCount = (taskCount != null ? taskCount : 1); + this.replicas = replicas != null ? replicas : 1; + this.taskCount = taskCount != null ? taskCount : 1; this.taskDuration = defaultDuration(taskDuration, "PT1H"); this.startDelay = defaultDuration(startDelay, "PT5S"); this.period = defaultDuration(period, "PT30S"); - this.useEarliestOffset = (useEarliestOffset != null ? useEarliestOffset : false); + this.useEarliestOffset = useEarliestOffset != null ? useEarliestOffset : false; this.completionTimeout = defaultDuration(completionTimeout, "PT30M"); - this.lateMessageRejectionPeriod = (lateMessageRejectionPeriod == null - ? Optional.absent() - : Optional.of(lateMessageRejectionPeriod.toStandardDuration())); + this.lateMessageRejectionPeriod = lateMessageRejectionPeriod == null + ? Optional.absent() + : Optional.of(lateMessageRejectionPeriod.toStandardDuration()); + this.skipOffsetGaps = skipOffsetGaps != null ? skipOffsetGaps : false; } @JsonProperty @@ -119,7 +122,7 @@ public class KafkaSupervisorIOConfig } @JsonProperty - public Boolean isUseEarliestOffset() + public boolean isUseEarliestOffset() { return useEarliestOffset; } @@ -136,6 +139,12 @@ public class KafkaSupervisorIOConfig return lateMessageRejectionPeriod; } + @JsonProperty + public boolean isSkipOffsetGaps() + { + return skipOffsetGaps; + } + @Override public String toString() { @@ -150,6 +159,7 @@ public class KafkaSupervisorIOConfig ", useEarliestOffset=" + useEarliestOffset + ", completionTimeout=" + completionTimeout + ", lateMessageRejectionPeriod=" + lateMessageRejectionPeriod + + ", skipOffsetGaps=" + skipOffsetGaps + '}'; } diff --git a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIOConfigTest.java b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIOConfigTest.java index de63ab14904..c18cc958805 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIOConfigTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIOConfigTest.java @@ -74,6 +74,7 @@ public class KafkaIOConfigTest Assert.assertEquals(true, config.isUseTransaction()); Assert.assertEquals(false, config.isPauseAfterRead()); Assert.assertFalse("minimumMessageTime", config.getMinimumMessageTime().isPresent()); + Assert.assertFalse("skipOffsetGaps", config.isSkipOffsetGaps()); } @Test @@ -87,7 +88,8 @@ public class KafkaIOConfigTest + " \"consumerProperties\": {\"bootstrap.servers\":\"localhost:9092\"},\n" + " \"useTransaction\": false,\n" + " \"pauseAfterRead\": true,\n" - + " \"minimumMessageTime\": \"2016-05-31T12:00Z\"\n" + + " \"minimumMessageTime\": \"2016-05-31T12:00Z\",\n" + + " \"skipOffsetGaps\": true\n" + "}"; KafkaIOConfig config = (KafkaIOConfig) mapper.readValue( @@ -108,6 +110,7 @@ public class KafkaIOConfigTest Assert.assertEquals(false, config.isUseTransaction()); Assert.assertEquals(true, config.isPauseAfterRead()); Assert.assertEquals(new DateTime("2016-05-31T12:00Z"), config.getMinimumMessageTime().get()); + Assert.assertTrue("skipOffsetGaps", config.isSkipOffsetGaps()); } @Test diff --git a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java index 2132d0623bd..f6acd7dc98e 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java @@ -304,7 +304,8 @@ public class KafkaIndexTaskTest kafkaServer.consumerProperties(), true, false, - null + null, + false ), null, null @@ -346,7 +347,8 @@ public class KafkaIndexTaskTest kafkaServer.consumerProperties(), true, false, - null + null, + false ), null, null @@ -400,7 +402,8 @@ public class KafkaIndexTaskTest kafkaServer.consumerProperties(), true, false, - new DateTime("2010") + new DateTime("2010"), + false ), null, null @@ -461,7 +464,8 @@ public class KafkaIndexTaskTest kafkaServer.consumerProperties(), true, false, - null + null, + false ), null, null @@ -502,7 +506,8 @@ public class KafkaIndexTaskTest kafkaServer.consumerProperties(), true, false, - null + null, + false ), null, null @@ -554,7 +559,8 @@ public class KafkaIndexTaskTest kafkaServer.consumerProperties(), true, false, - null + null, + false ), null, null @@ -605,7 +611,8 @@ public class KafkaIndexTaskTest kafkaServer.consumerProperties(), true, false, - null + null, + false ), null, null @@ -638,7 +645,8 @@ public class KafkaIndexTaskTest kafkaServer.consumerProperties(), true, false, - null + null, + false ), null, null @@ -652,7 +660,8 @@ public class KafkaIndexTaskTest kafkaServer.consumerProperties(), true, false, - null + null, + false ), null, null @@ -706,7 +715,8 @@ public class KafkaIndexTaskTest kafkaServer.consumerProperties(), true, false, - null + null, + false ), null, null @@ -720,7 +730,8 @@ public class KafkaIndexTaskTest kafkaServer.consumerProperties(), true, false, - null + null, + false ), null, null @@ -775,7 +786,8 @@ public class KafkaIndexTaskTest kafkaServer.consumerProperties(), false, false, - null + null, + false ), null, null @@ -789,7 +801,8 @@ public class KafkaIndexTaskTest kafkaServer.consumerProperties(), false, false, - null + null, + false ), null, null @@ -849,7 +862,8 @@ public class KafkaIndexTaskTest kafkaServer.consumerProperties(), true, false, - null + null, + false ), null, null @@ -906,7 +920,8 @@ public class KafkaIndexTaskTest kafkaServer.consumerProperties(), true, false, - null + null, + false ), null, null @@ -920,7 +935,8 @@ public class KafkaIndexTaskTest kafkaServer.consumerProperties(), true, false, - null + null, + false ), null, null @@ -976,7 +992,8 @@ public class KafkaIndexTaskTest kafkaServer.consumerProperties(), true, false, - null + null, + false ), null, null @@ -1011,7 +1028,8 @@ public class KafkaIndexTaskTest kafkaServer.consumerProperties(), true, false, - null + null, + false ), null, null @@ -1063,7 +1081,8 @@ public class KafkaIndexTaskTest kafkaServer.consumerProperties(), true, false, - null + null, + false ), null, null @@ -1146,7 +1165,8 @@ public class KafkaIndexTaskTest kafkaServer.consumerProperties(), true, true, - null + null, + false ), null, null @@ -1233,7 +1253,8 @@ public class KafkaIndexTaskTest kafkaServer.consumerProperties(), true, false, - null + null, + false ), null, null @@ -1271,7 +1292,8 @@ public class KafkaIndexTaskTest kafkaServer.consumerProperties(), true, false, - null + null, + false ), null, true diff --git a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfigTest.java b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfigTest.java index e94f4a740fe..b453e705cfa 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfigTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfigTest.java @@ -73,6 +73,7 @@ public class KafkaSupervisorIOConfigTest Assert.assertEquals(false, config.isUseEarliestOffset()); Assert.assertEquals(Duration.standardMinutes(30), config.getCompletionTimeout()); Assert.assertFalse("lateMessageRejectionPeriod", config.getLateMessageRejectionPeriod().isPresent()); + Assert.assertFalse("skipOffsetGaps", config.isSkipOffsetGaps()); } @Test @@ -89,7 +90,8 @@ public class KafkaSupervisorIOConfigTest + " \"period\": \"PT10S\",\n" + " \"useEarliestOffset\": true,\n" + " \"completionTimeout\": \"PT45M\",\n" - + " \"lateMessageRejectionPeriod\": \"PT1H\"\n" + + " \"lateMessageRejectionPeriod\": \"PT1H\",\n" + + " \"skipOffsetGaps\": true\n" + "}"; KafkaSupervisorIOConfig config = mapper.readValue( @@ -111,6 +113,7 @@ public class KafkaSupervisorIOConfigTest Assert.assertEquals(true, config.isUseEarliestOffset()); Assert.assertEquals(Duration.standardMinutes(45), config.getCompletionTimeout()); Assert.assertEquals(Duration.standardHours(1), config.getLateMessageRejectionPeriod().get()); + Assert.assertTrue("skipOffsetGaps", config.isSkipOffsetGaps()); } @Test diff --git a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java index 0e8ba387113..17507339da3 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java @@ -201,7 +201,7 @@ public class KafkaSupervisorTest extends EasyMockSupport @Test public void testNoInitialState() throws Exception { - supervisor = getSupervisor(1, 1, true, "PT1H", null); + supervisor = getSupervisor(1, 1, true, "PT1H", null, false); addSomeEvents(1); Capture captured = Capture.newInstance(); @@ -232,6 +232,7 @@ public class KafkaSupervisorTest extends EasyMockSupport Assert.assertTrue("isUseTransaction", taskConfig.isUseTransaction()); Assert.assertFalse("pauseAfterRead", taskConfig.isPauseAfterRead()); Assert.assertFalse("minimumMessageTime", taskConfig.getMinimumMessageTime().isPresent()); + Assert.assertFalse("skipOffsetGaps", taskConfig.isSkipOffsetGaps()); Assert.assertEquals(KAFKA_TOPIC, taskConfig.getStartPartitions().getTopic()); Assert.assertEquals(0L, (long) taskConfig.getStartPartitions().getPartitionOffsetMap().get(0)); @@ -244,10 +245,39 @@ public class KafkaSupervisorTest extends EasyMockSupport Assert.assertEquals(Long.MAX_VALUE, (long) taskConfig.getEndPartitions().getPartitionOffsetMap().get(2)); } + @Test + public void testSkipOffsetGaps() throws Exception + { + supervisor = getSupervisor(1, 1, true, "PT1H", null, true); + addSomeEvents(1); + + Capture captured = Capture.newInstance(); + expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); + expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes(); + expect(taskStorage.getActiveTasks()).andReturn(ImmutableList.of()).anyTimes(); + expect(indexerMetadataStorageCoordinator.getDataSourceMetadata(DATASOURCE)).andReturn( + new KafkaDataSourceMetadata( + null + ) + ).anyTimes(); + expect(taskQueue.add(capture(captured))).andReturn(true); + taskRunner.registerListener(anyObject(TaskRunnerListener.class), anyObject(Executor.class)); + replayAll(); + + supervisor.start(); + supervisor.runInternal(); + verifyAll(); + + KafkaIndexTask task = captured.getValue(); + KafkaIOConfig taskConfig = task.getIOConfig(); + + Assert.assertTrue("skipOffsetGaps", taskConfig.isSkipOffsetGaps()); + } + @Test public void testMultiTask() throws Exception { - supervisor = getSupervisor(1, 2, true, "PT1H", null); + supervisor = getSupervisor(1, 2, true, "PT1H", null, false); addSomeEvents(1); Capture captured = Capture.newInstance(CaptureType.ALL); @@ -284,7 +314,7 @@ public class KafkaSupervisorTest extends EasyMockSupport @Test public void testReplicas() throws Exception { - supervisor = getSupervisor(2, 1, true, "PT1H", null); + supervisor = getSupervisor(2, 1, true, "PT1H", null, false); addSomeEvents(1); Capture captured = Capture.newInstance(CaptureType.ALL); @@ -321,7 +351,7 @@ public class KafkaSupervisorTest extends EasyMockSupport @Test public void testLateMessageRejectionPeriod() throws Exception { - supervisor = getSupervisor(2, 1, true, "PT1H", new Period("PT1H")); + supervisor = getSupervisor(2, 1, true, "PT1H", new Period("PT1H"), false); addSomeEvents(1); Capture captured = Capture.newInstance(CaptureType.ALL); @@ -363,7 +393,7 @@ public class KafkaSupervisorTest extends EasyMockSupport */ public void testLatestOffset() throws Exception { - supervisor = getSupervisor(1, 1, false, "PT1H", null); + supervisor = getSupervisor(1, 1, false, "PT1H", null, false); addSomeEvents(1100); Capture captured = Capture.newInstance(); @@ -395,7 +425,7 @@ public class KafkaSupervisorTest extends EasyMockSupport */ public void testDatasourceMetadata() throws Exception { - supervisor = getSupervisor(1, 1, true, "PT1H", null); + supervisor = getSupervisor(1, 1, true, "PT1H", null, false); addSomeEvents(100); Capture captured = Capture.newInstance(); @@ -425,7 +455,7 @@ public class KafkaSupervisorTest extends EasyMockSupport @Test(expected = ISE.class) public void testBadMetadataOffsets() throws Exception { - supervisor = getSupervisor(1, 1, true, "PT1H", null); + supervisor = getSupervisor(1, 1, true, "PT1H", null, false); addSomeEvents(1); expect(taskMaster.getTaskRunner()).andReturn(Optional.absent()).anyTimes(); @@ -444,7 +474,7 @@ public class KafkaSupervisorTest extends EasyMockSupport @Test public void testKillIncompatibleTasks() throws Exception { - supervisor = getSupervisor(2, 1, true, "PT1H", null); + supervisor = getSupervisor(2, 1, true, "PT1H", null, false); addSomeEvents(1); Task id1 = createKafkaIndexTask( // unexpected # of partitions (kill) @@ -530,7 +560,7 @@ public class KafkaSupervisorTest extends EasyMockSupport @Test public void testKillBadPartitionAssignment() throws Exception { - supervisor = getSupervisor(1, 2, true, "PT1H", null); + supervisor = getSupervisor(1, 2, true, "PT1H", null, false); addSomeEvents(1); Task id1 = createKafkaIndexTask( @@ -614,7 +644,7 @@ public class KafkaSupervisorTest extends EasyMockSupport @Test public void testRequeueTaskWhenFailed() throws Exception { - supervisor = getSupervisor(2, 2, true, "PT1H", null); + supervisor = getSupervisor(2, 2, true, "PT1H", null, false); addSomeEvents(1); Capture captured = Capture.newInstance(CaptureType.ALL); @@ -683,7 +713,7 @@ public class KafkaSupervisorTest extends EasyMockSupport @Test public void testRequeueAdoptedTaskWhenFailed() throws Exception { - supervisor = getSupervisor(2, 1, true, "PT1H", null); + supervisor = getSupervisor(2, 1, true, "PT1H", null, false); addSomeEvents(1); DateTime now = DateTime.now(); @@ -759,7 +789,7 @@ public class KafkaSupervisorTest extends EasyMockSupport @Test public void testQueueNextTasksOnSuccess() throws Exception { - supervisor = getSupervisor(2, 2, true, "PT1H", null); + supervisor = getSupervisor(2, 2, true, "PT1H", null, false); addSomeEvents(1); Capture captured = Capture.newInstance(CaptureType.ALL); @@ -830,7 +860,7 @@ public class KafkaSupervisorTest extends EasyMockSupport { final TaskLocation location = new TaskLocation("testHost", 1234); - supervisor = getSupervisor(2, 2, true, "PT1M", null); + supervisor = getSupervisor(2, 2, true, "PT1M", null, false); addSomeEvents(100); Capture captured = Capture.newInstance(CaptureType.ALL); @@ -913,7 +943,7 @@ public class KafkaSupervisorTest extends EasyMockSupport { final TaskLocation location = new TaskLocation("testHost", 1234); - supervisor = getSupervisor(1, 1, true, "PT1H", null); + supervisor = getSupervisor(1, 1, true, "PT1H", null, false); addSomeEvents(1); Task task = createKafkaIndexTask( @@ -1002,7 +1032,7 @@ public class KafkaSupervisorTest extends EasyMockSupport { final TaskLocation location = new TaskLocation("testHost", 1234); - supervisor = getSupervisor(1, 1, true, "PT1H", null); + supervisor = getSupervisor(1, 1, true, "PT1H", null, false); addSomeEvents(1); Task task = createKafkaIndexTask( @@ -1093,7 +1123,7 @@ public class KafkaSupervisorTest extends EasyMockSupport final TaskLocation location2 = new TaskLocation("testHost2", 145); final DateTime startTime = new DateTime(); - supervisor = getSupervisor(1, 1, true, "PT1H", null); + supervisor = getSupervisor(1, 1, true, "PT1H", null, false); addSomeEvents(1); Task id1 = createKafkaIndexTask( @@ -1178,7 +1208,7 @@ public class KafkaSupervisorTest extends EasyMockSupport @Test public void testKillUnresponsiveTasksWhileGettingStartTime() throws Exception { - supervisor = getSupervisor(2, 2, true, "PT1H", null); + supervisor = getSupervisor(2, 2, true, "PT1H", null, false); addSomeEvents(1); Capture captured = Capture.newInstance(CaptureType.ALL); @@ -1223,7 +1253,7 @@ public class KafkaSupervisorTest extends EasyMockSupport { final TaskLocation location = new TaskLocation("testHost", 1234); - supervisor = getSupervisor(2, 2, true, "PT1M", null); + supervisor = getSupervisor(2, 2, true, "PT1M", null, false); addSomeEvents(100); Capture captured = Capture.newInstance(CaptureType.ALL); @@ -1290,7 +1320,7 @@ public class KafkaSupervisorTest extends EasyMockSupport { final TaskLocation location = new TaskLocation("testHost", 1234); - supervisor = getSupervisor(2, 2, true, "PT1M", null); + supervisor = getSupervisor(2, 2, true, "PT1M", null, false); addSomeEvents(100); Capture captured = Capture.newInstance(CaptureType.ALL); @@ -1363,7 +1393,7 @@ public class KafkaSupervisorTest extends EasyMockSupport @Test(expected = IllegalStateException.class) public void testStopNotStarted() throws Exception { - supervisor = getSupervisor(1, 1, true, "PT1H", null); + supervisor = getSupervisor(1, 1, true, "PT1H", null, false); supervisor.stop(false); } @@ -1375,7 +1405,7 @@ public class KafkaSupervisorTest extends EasyMockSupport taskRunner.unregisterListener(String.format("KafkaSupervisor-%s", DATASOURCE)); replayAll(); - supervisor = getSupervisor(1, 1, true, "PT1H", null); + supervisor = getSupervisor(1, 1, true, "PT1H", null, false); supervisor.start(); supervisor.stop(false); @@ -1389,7 +1419,7 @@ public class KafkaSupervisorTest extends EasyMockSupport final TaskLocation location2 = new TaskLocation("testHost2", 145); final DateTime startTime = new DateTime(); - supervisor = getSupervisor(2, 1, true, "PT1H", null); + supervisor = getSupervisor(2, 1, true, "PT1H", null, false); addSomeEvents(1); Task id1 = createKafkaIndexTask( @@ -1470,7 +1500,7 @@ public class KafkaSupervisorTest extends EasyMockSupport @Test public void testResetNoTasks() throws Exception { - supervisor = getSupervisor(1, 1, true, "PT1H", null); + supervisor = getSupervisor(1, 1, true, "PT1H", null, false); expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes(); expect(taskRunner.getRunningTasks()).andReturn(Collections.EMPTY_LIST).anyTimes(); @@ -1494,7 +1524,7 @@ public class KafkaSupervisorTest extends EasyMockSupport @Test public void testResetDataSourceMetadata() throws Exception { - supervisor = getSupervisor(1, 1, true, "PT1H", null); + supervisor = getSupervisor(1, 1, true, "PT1H", null, false); expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes(); expect(taskRunner.getRunningTasks()).andReturn(Collections.EMPTY_LIST).anyTimes(); @@ -1542,7 +1572,7 @@ public class KafkaSupervisorTest extends EasyMockSupport @Test public void testResetNoDataSourceMetadata() throws Exception { - supervisor = getSupervisor(1, 1, true, "PT1H", null); + supervisor = getSupervisor(1, 1, true, "PT1H", null, false); expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes(); expect(taskRunner.getRunningTasks()).andReturn(Collections.EMPTY_LIST).anyTimes(); @@ -1575,7 +1605,7 @@ public class KafkaSupervisorTest extends EasyMockSupport final TaskLocation location2 = new TaskLocation("testHost2", 145); final DateTime startTime = new DateTime(); - supervisor = getSupervisor(2, 1, true, "PT1H", null); + supervisor = getSupervisor(2, 1, true, "PT1H", null, false); addSomeEvents(1); Task id1 = createKafkaIndexTask( @@ -1671,7 +1701,8 @@ public class KafkaSupervisorTest extends EasyMockSupport int taskCount, boolean useEarliestOffset, String duration, - Period lateMessageRejectionPeriod + Period lateMessageRejectionPeriod, + boolean skipOffsetGaps ) { KafkaSupervisorIOConfig kafkaSupervisorIOConfig = new KafkaSupervisorIOConfig( @@ -1684,7 +1715,8 @@ public class KafkaSupervisorTest extends EasyMockSupport new Period("PT30S"), useEarliestOffset, new Period("PT30M"), - lateMessageRejectionPeriod + lateMessageRejectionPeriod, + skipOffsetGaps ); KafkaIndexTaskClientFactory taskClientFactory = new KafkaIndexTaskClientFactory(null, null) @@ -1782,7 +1814,8 @@ public class KafkaSupervisorTest extends EasyMockSupport ImmutableMap.of(), true, false, - minimumMessageTime + minimumMessageTime, + false ), ImmutableMap.of(), null