add skipOffsetGaps flag (#4256)

This commit is contained in:
David Lim 2017-05-16 12:19:28 -06:00 committed by GitHub
parent 136b2fae72
commit 8333043b7b
9 changed files with 163 additions and 70 deletions

View File

@ -168,6 +168,7 @@ For Roaring bitmaps:
|`useEarliestOffset`|Boolean|If a supervisor is managing a dataSource for the first time, it will obtain a set of starting offsets from Kafka. This flag determines whether it retrieves the earliest or latest offsets in Kafka. Under normal circumstances, subsequent tasks will start from where the previous segments ended so this flag will only be used on first run.|no (default == false)|
|`completionTimeout`|ISO8601 Period|The length of time to wait before declaring a publishing task as failed and terminating it. If this is set too low, your tasks may never publish. The publishing clock for a task begins roughly after `taskDuration` elapses.|no (default == PT30M)|
|`lateMessageRejectionPeriod`|ISO8601 Period|Configure tasks to reject messages with timestamps earlier than this period before the task was created; for example if this is set to `PT1H` and the supervisor creates a task at *2016-01-01T12:00Z*, messages with timestamps earlier than *2016-01-01T11:00Z* will be dropped. This may help prevent concurrency issues if your data stream has late messages and you have multiple pipelines that need to operate on the same segments (e.g. a realtime and a nightly batch ingestion pipeline).|no (default == none)|
|`skipOffsetGaps`|Boolean|Whether or not to allow gaps of missing offsets in the Kafka stream. This is required for compatibility with implementations such as MapR Streams which does not guarantee consecutive offsets. If this is false, an exception will be thrown if offsets are not consecutive.|no (default == false)|
## Supervisor API

View File

@ -32,6 +32,7 @@ public class KafkaIOConfig implements IOConfig
{
private static final boolean DEFAULT_USE_TRANSACTION = true;
private static final boolean DEFAULT_PAUSE_AFTER_READ = false;
private static final boolean DEFAULT_SKIP_OFFSET_GAPS = false;
private final String baseSequenceName;
private final KafkaPartitions startPartitions;
@ -40,6 +41,7 @@ public class KafkaIOConfig implements IOConfig
private final boolean useTransaction;
private final boolean pauseAfterRead;
private final Optional<DateTime> minimumMessageTime;
private final boolean skipOffsetGaps;
@JsonCreator
public KafkaIOConfig(
@ -49,7 +51,8 @@ public class KafkaIOConfig implements IOConfig
@JsonProperty("consumerProperties") Map<String, String> consumerProperties,
@JsonProperty("useTransaction") Boolean useTransaction,
@JsonProperty("pauseAfterRead") Boolean pauseAfterRead,
@JsonProperty("minimumMessageTime") DateTime minimumMessageTime
@JsonProperty("minimumMessageTime") DateTime minimumMessageTime,
@JsonProperty("skipOffsetGaps") Boolean skipOffsetGaps
)
{
this.baseSequenceName = Preconditions.checkNotNull(baseSequenceName, "baseSequenceName");
@ -59,6 +62,7 @@ public class KafkaIOConfig implements IOConfig
this.useTransaction = useTransaction != null ? useTransaction : DEFAULT_USE_TRANSACTION;
this.pauseAfterRead = pauseAfterRead != null ? pauseAfterRead : DEFAULT_PAUSE_AFTER_READ;
this.minimumMessageTime = Optional.fromNullable(minimumMessageTime);
this.skipOffsetGaps = skipOffsetGaps != null ? skipOffsetGaps : DEFAULT_SKIP_OFFSET_GAPS;
Preconditions.checkArgument(
startPartitions.getTopic().equals(endPartitions.getTopic()),
@ -122,6 +126,12 @@ public class KafkaIOConfig implements IOConfig
return minimumMessageTime;
}
@JsonProperty
public boolean isSkipOffsetGaps()
{
return skipOffsetGaps;
}
@Override
public String toString()
{
@ -133,6 +143,7 @@ public class KafkaIOConfig implements IOConfig
", useTransaction=" + useTransaction +
", pauseAfterRead=" + pauseAfterRead +
", minimumMessageTime=" + minimumMessageTime +
", skipOffsetGaps=" + skipOffsetGaps +
'}';
}
}

View File

@ -405,12 +405,21 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler
if (record.offset() < endOffsets.get(record.partition())) {
if (record.offset() != nextOffsets.get(record.partition())) {
throw new ISE(
"WTF?! Got offset[%,d] after offset[%,d] in partition[%d].",
record.offset(),
nextOffsets.get(record.partition()),
record.partition()
);
if (ioConfig.isSkipOffsetGaps()) {
log.warn(
"Skipped to offset[%,d] after offset[%,d] in partition[%d].",
record.offset(),
nextOffsets.get(record.partition()),
record.partition()
);
} else {
throw new ISE(
"WTF?! Got offset[%,d] after offset[%,d] in partition[%d].",
record.offset(),
nextOffsets.get(record.partition()),
record.partition()
);
}
}
try {

View File

@ -1401,7 +1401,8 @@ public class KafkaSupervisor implements Supervisor
consumerProperties,
true,
false,
minimumMessageTime
minimumMessageTime,
ioConfig.isSkipOffsetGaps()
);
for (int i = 0; i < replicas; i++) {

View File

@ -39,9 +39,10 @@ public class KafkaSupervisorIOConfig
private final Map<String, String> consumerProperties;
private final Duration startDelay;
private final Duration period;
private final Boolean useEarliestOffset;
private final boolean useEarliestOffset;
private final Duration completionTimeout;
private final Optional<Duration> lateMessageRejectionPeriod;
private final boolean skipOffsetGaps;
@JsonCreator
public KafkaSupervisorIOConfig(
@ -54,7 +55,8 @@ public class KafkaSupervisorIOConfig
@JsonProperty("period") Period period,
@JsonProperty("useEarliestOffset") Boolean useEarliestOffset,
@JsonProperty("completionTimeout") Period completionTimeout,
@JsonProperty("lateMessageRejectionPeriod") Period lateMessageRejectionPeriod
@JsonProperty("lateMessageRejectionPeriod") Period lateMessageRejectionPeriod,
@JsonProperty("skipOffsetGaps") Boolean skipOffsetGaps
)
{
this.topic = Preconditions.checkNotNull(topic, "topic");
@ -64,16 +66,17 @@ public class KafkaSupervisorIOConfig
String.format("consumerProperties must contain entry for [%s]", BOOTSTRAP_SERVERS_KEY)
);
this.replicas = (replicas != null ? replicas : 1);
this.taskCount = (taskCount != null ? taskCount : 1);
this.replicas = replicas != null ? replicas : 1;
this.taskCount = taskCount != null ? taskCount : 1;
this.taskDuration = defaultDuration(taskDuration, "PT1H");
this.startDelay = defaultDuration(startDelay, "PT5S");
this.period = defaultDuration(period, "PT30S");
this.useEarliestOffset = (useEarliestOffset != null ? useEarliestOffset : false);
this.useEarliestOffset = useEarliestOffset != null ? useEarliestOffset : false;
this.completionTimeout = defaultDuration(completionTimeout, "PT30M");
this.lateMessageRejectionPeriod = (lateMessageRejectionPeriod == null
? Optional.<Duration>absent()
: Optional.of(lateMessageRejectionPeriod.toStandardDuration()));
this.lateMessageRejectionPeriod = lateMessageRejectionPeriod == null
? Optional.<Duration>absent()
: Optional.of(lateMessageRejectionPeriod.toStandardDuration());
this.skipOffsetGaps = skipOffsetGaps != null ? skipOffsetGaps : false;
}
@JsonProperty
@ -119,7 +122,7 @@ public class KafkaSupervisorIOConfig
}
@JsonProperty
public Boolean isUseEarliestOffset()
public boolean isUseEarliestOffset()
{
return useEarliestOffset;
}
@ -136,6 +139,12 @@ public class KafkaSupervisorIOConfig
return lateMessageRejectionPeriod;
}
@JsonProperty
public boolean isSkipOffsetGaps()
{
return skipOffsetGaps;
}
@Override
public String toString()
{
@ -150,6 +159,7 @@ public class KafkaSupervisorIOConfig
", useEarliestOffset=" + useEarliestOffset +
", completionTimeout=" + completionTimeout +
", lateMessageRejectionPeriod=" + lateMessageRejectionPeriod +
", skipOffsetGaps=" + skipOffsetGaps +
'}';
}

View File

@ -74,6 +74,7 @@ public class KafkaIOConfigTest
Assert.assertEquals(true, config.isUseTransaction());
Assert.assertEquals(false, config.isPauseAfterRead());
Assert.assertFalse("minimumMessageTime", config.getMinimumMessageTime().isPresent());
Assert.assertFalse("skipOffsetGaps", config.isSkipOffsetGaps());
}
@Test
@ -87,7 +88,8 @@ public class KafkaIOConfigTest
+ " \"consumerProperties\": {\"bootstrap.servers\":\"localhost:9092\"},\n"
+ " \"useTransaction\": false,\n"
+ " \"pauseAfterRead\": true,\n"
+ " \"minimumMessageTime\": \"2016-05-31T12:00Z\"\n"
+ " \"minimumMessageTime\": \"2016-05-31T12:00Z\",\n"
+ " \"skipOffsetGaps\": true\n"
+ "}";
KafkaIOConfig config = (KafkaIOConfig) mapper.readValue(
@ -108,6 +110,7 @@ public class KafkaIOConfigTest
Assert.assertEquals(false, config.isUseTransaction());
Assert.assertEquals(true, config.isPauseAfterRead());
Assert.assertEquals(new DateTime("2016-05-31T12:00Z"), config.getMinimumMessageTime().get());
Assert.assertTrue("skipOffsetGaps", config.isSkipOffsetGaps());
}
@Test

View File

@ -304,7 +304,8 @@ public class KafkaIndexTaskTest
kafkaServer.consumerProperties(),
true,
false,
null
null,
false
),
null,
null
@ -346,7 +347,8 @@ public class KafkaIndexTaskTest
kafkaServer.consumerProperties(),
true,
false,
null
null,
false
),
null,
null
@ -400,7 +402,8 @@ public class KafkaIndexTaskTest
kafkaServer.consumerProperties(),
true,
false,
new DateTime("2010")
new DateTime("2010"),
false
),
null,
null
@ -461,7 +464,8 @@ public class KafkaIndexTaskTest
kafkaServer.consumerProperties(),
true,
false,
null
null,
false
),
null,
null
@ -502,7 +506,8 @@ public class KafkaIndexTaskTest
kafkaServer.consumerProperties(),
true,
false,
null
null,
false
),
null,
null
@ -554,7 +559,8 @@ public class KafkaIndexTaskTest
kafkaServer.consumerProperties(),
true,
false,
null
null,
false
),
null,
null
@ -605,7 +611,8 @@ public class KafkaIndexTaskTest
kafkaServer.consumerProperties(),
true,
false,
null
null,
false
),
null,
null
@ -638,7 +645,8 @@ public class KafkaIndexTaskTest
kafkaServer.consumerProperties(),
true,
false,
null
null,
false
),
null,
null
@ -652,7 +660,8 @@ public class KafkaIndexTaskTest
kafkaServer.consumerProperties(),
true,
false,
null
null,
false
),
null,
null
@ -706,7 +715,8 @@ public class KafkaIndexTaskTest
kafkaServer.consumerProperties(),
true,
false,
null
null,
false
),
null,
null
@ -720,7 +730,8 @@ public class KafkaIndexTaskTest
kafkaServer.consumerProperties(),
true,
false,
null
null,
false
),
null,
null
@ -775,7 +786,8 @@ public class KafkaIndexTaskTest
kafkaServer.consumerProperties(),
false,
false,
null
null,
false
),
null,
null
@ -789,7 +801,8 @@ public class KafkaIndexTaskTest
kafkaServer.consumerProperties(),
false,
false,
null
null,
false
),
null,
null
@ -849,7 +862,8 @@ public class KafkaIndexTaskTest
kafkaServer.consumerProperties(),
true,
false,
null
null,
false
),
null,
null
@ -906,7 +920,8 @@ public class KafkaIndexTaskTest
kafkaServer.consumerProperties(),
true,
false,
null
null,
false
),
null,
null
@ -920,7 +935,8 @@ public class KafkaIndexTaskTest
kafkaServer.consumerProperties(),
true,
false,
null
null,
false
),
null,
null
@ -976,7 +992,8 @@ public class KafkaIndexTaskTest
kafkaServer.consumerProperties(),
true,
false,
null
null,
false
),
null,
null
@ -1011,7 +1028,8 @@ public class KafkaIndexTaskTest
kafkaServer.consumerProperties(),
true,
false,
null
null,
false
),
null,
null
@ -1063,7 +1081,8 @@ public class KafkaIndexTaskTest
kafkaServer.consumerProperties(),
true,
false,
null
null,
false
),
null,
null
@ -1146,7 +1165,8 @@ public class KafkaIndexTaskTest
kafkaServer.consumerProperties(),
true,
true,
null
null,
false
),
null,
null
@ -1233,7 +1253,8 @@ public class KafkaIndexTaskTest
kafkaServer.consumerProperties(),
true,
false,
null
null,
false
),
null,
null
@ -1271,7 +1292,8 @@ public class KafkaIndexTaskTest
kafkaServer.consumerProperties(),
true,
false,
null
null,
false
),
null,
true

View File

@ -73,6 +73,7 @@ public class KafkaSupervisorIOConfigTest
Assert.assertEquals(false, config.isUseEarliestOffset());
Assert.assertEquals(Duration.standardMinutes(30), config.getCompletionTimeout());
Assert.assertFalse("lateMessageRejectionPeriod", config.getLateMessageRejectionPeriod().isPresent());
Assert.assertFalse("skipOffsetGaps", config.isSkipOffsetGaps());
}
@Test
@ -89,7 +90,8 @@ public class KafkaSupervisorIOConfigTest
+ " \"period\": \"PT10S\",\n"
+ " \"useEarliestOffset\": true,\n"
+ " \"completionTimeout\": \"PT45M\",\n"
+ " \"lateMessageRejectionPeriod\": \"PT1H\"\n"
+ " \"lateMessageRejectionPeriod\": \"PT1H\",\n"
+ " \"skipOffsetGaps\": true\n"
+ "}";
KafkaSupervisorIOConfig config = mapper.readValue(
@ -111,6 +113,7 @@ public class KafkaSupervisorIOConfigTest
Assert.assertEquals(true, config.isUseEarliestOffset());
Assert.assertEquals(Duration.standardMinutes(45), config.getCompletionTimeout());
Assert.assertEquals(Duration.standardHours(1), config.getLateMessageRejectionPeriod().get());
Assert.assertTrue("skipOffsetGaps", config.isSkipOffsetGaps());
}
@Test

View File

@ -201,7 +201,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
@Test
public void testNoInitialState() throws Exception
{
supervisor = getSupervisor(1, 1, true, "PT1H", null);
supervisor = getSupervisor(1, 1, true, "PT1H", null, false);
addSomeEvents(1);
Capture<KafkaIndexTask> captured = Capture.newInstance();
@ -232,6 +232,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
Assert.assertTrue("isUseTransaction", taskConfig.isUseTransaction());
Assert.assertFalse("pauseAfterRead", taskConfig.isPauseAfterRead());
Assert.assertFalse("minimumMessageTime", taskConfig.getMinimumMessageTime().isPresent());
Assert.assertFalse("skipOffsetGaps", taskConfig.isSkipOffsetGaps());
Assert.assertEquals(KAFKA_TOPIC, taskConfig.getStartPartitions().getTopic());
Assert.assertEquals(0L, (long) taskConfig.getStartPartitions().getPartitionOffsetMap().get(0));
@ -244,10 +245,39 @@ public class KafkaSupervisorTest extends EasyMockSupport
Assert.assertEquals(Long.MAX_VALUE, (long) taskConfig.getEndPartitions().getPartitionOffsetMap().get(2));
}
@Test
public void testSkipOffsetGaps() throws Exception
{
supervisor = getSupervisor(1, 1, true, "PT1H", null, true);
addSomeEvents(1);
Capture<KafkaIndexTask> captured = Capture.newInstance();
expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes();
expect(taskStorage.getActiveTasks()).andReturn(ImmutableList.<Task>of()).anyTimes();
expect(indexerMetadataStorageCoordinator.getDataSourceMetadata(DATASOURCE)).andReturn(
new KafkaDataSourceMetadata(
null
)
).anyTimes();
expect(taskQueue.add(capture(captured))).andReturn(true);
taskRunner.registerListener(anyObject(TaskRunnerListener.class), anyObject(Executor.class));
replayAll();
supervisor.start();
supervisor.runInternal();
verifyAll();
KafkaIndexTask task = captured.getValue();
KafkaIOConfig taskConfig = task.getIOConfig();
Assert.assertTrue("skipOffsetGaps", taskConfig.isSkipOffsetGaps());
}
@Test
public void testMultiTask() throws Exception
{
supervisor = getSupervisor(1, 2, true, "PT1H", null);
supervisor = getSupervisor(1, 2, true, "PT1H", null, false);
addSomeEvents(1);
Capture<KafkaIndexTask> captured = Capture.newInstance(CaptureType.ALL);
@ -284,7 +314,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
@Test
public void testReplicas() throws Exception
{
supervisor = getSupervisor(2, 1, true, "PT1H", null);
supervisor = getSupervisor(2, 1, true, "PT1H", null, false);
addSomeEvents(1);
Capture<KafkaIndexTask> captured = Capture.newInstance(CaptureType.ALL);
@ -321,7 +351,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
@Test
public void testLateMessageRejectionPeriod() throws Exception
{
supervisor = getSupervisor(2, 1, true, "PT1H", new Period("PT1H"));
supervisor = getSupervisor(2, 1, true, "PT1H", new Period("PT1H"), false);
addSomeEvents(1);
Capture<KafkaIndexTask> captured = Capture.newInstance(CaptureType.ALL);
@ -363,7 +393,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
*/
public void testLatestOffset() throws Exception
{
supervisor = getSupervisor(1, 1, false, "PT1H", null);
supervisor = getSupervisor(1, 1, false, "PT1H", null, false);
addSomeEvents(1100);
Capture<KafkaIndexTask> captured = Capture.newInstance();
@ -395,7 +425,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
*/
public void testDatasourceMetadata() throws Exception
{
supervisor = getSupervisor(1, 1, true, "PT1H", null);
supervisor = getSupervisor(1, 1, true, "PT1H", null, false);
addSomeEvents(100);
Capture<KafkaIndexTask> captured = Capture.newInstance();
@ -425,7 +455,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
@Test(expected = ISE.class)
public void testBadMetadataOffsets() throws Exception
{
supervisor = getSupervisor(1, 1, true, "PT1H", null);
supervisor = getSupervisor(1, 1, true, "PT1H", null, false);
addSomeEvents(1);
expect(taskMaster.getTaskRunner()).andReturn(Optional.<TaskRunner>absent()).anyTimes();
@ -444,7 +474,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
@Test
public void testKillIncompatibleTasks() throws Exception
{
supervisor = getSupervisor(2, 1, true, "PT1H", null);
supervisor = getSupervisor(2, 1, true, "PT1H", null, false);
addSomeEvents(1);
Task id1 = createKafkaIndexTask( // unexpected # of partitions (kill)
@ -530,7 +560,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
@Test
public void testKillBadPartitionAssignment() throws Exception
{
supervisor = getSupervisor(1, 2, true, "PT1H", null);
supervisor = getSupervisor(1, 2, true, "PT1H", null, false);
addSomeEvents(1);
Task id1 = createKafkaIndexTask(
@ -614,7 +644,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
@Test
public void testRequeueTaskWhenFailed() throws Exception
{
supervisor = getSupervisor(2, 2, true, "PT1H", null);
supervisor = getSupervisor(2, 2, true, "PT1H", null, false);
addSomeEvents(1);
Capture<Task> captured = Capture.newInstance(CaptureType.ALL);
@ -683,7 +713,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
@Test
public void testRequeueAdoptedTaskWhenFailed() throws Exception
{
supervisor = getSupervisor(2, 1, true, "PT1H", null);
supervisor = getSupervisor(2, 1, true, "PT1H", null, false);
addSomeEvents(1);
DateTime now = DateTime.now();
@ -759,7 +789,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
@Test
public void testQueueNextTasksOnSuccess() throws Exception
{
supervisor = getSupervisor(2, 2, true, "PT1H", null);
supervisor = getSupervisor(2, 2, true, "PT1H", null, false);
addSomeEvents(1);
Capture<Task> captured = Capture.newInstance(CaptureType.ALL);
@ -830,7 +860,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
{
final TaskLocation location = new TaskLocation("testHost", 1234);
supervisor = getSupervisor(2, 2, true, "PT1M", null);
supervisor = getSupervisor(2, 2, true, "PT1M", null, false);
addSomeEvents(100);
Capture<Task> captured = Capture.newInstance(CaptureType.ALL);
@ -913,7 +943,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
{
final TaskLocation location = new TaskLocation("testHost", 1234);
supervisor = getSupervisor(1, 1, true, "PT1H", null);
supervisor = getSupervisor(1, 1, true, "PT1H", null, false);
addSomeEvents(1);
Task task = createKafkaIndexTask(
@ -1002,7 +1032,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
{
final TaskLocation location = new TaskLocation("testHost", 1234);
supervisor = getSupervisor(1, 1, true, "PT1H", null);
supervisor = getSupervisor(1, 1, true, "PT1H", null, false);
addSomeEvents(1);
Task task = createKafkaIndexTask(
@ -1093,7 +1123,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
final TaskLocation location2 = new TaskLocation("testHost2", 145);
final DateTime startTime = new DateTime();
supervisor = getSupervisor(1, 1, true, "PT1H", null);
supervisor = getSupervisor(1, 1, true, "PT1H", null, false);
addSomeEvents(1);
Task id1 = createKafkaIndexTask(
@ -1178,7 +1208,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
@Test
public void testKillUnresponsiveTasksWhileGettingStartTime() throws Exception
{
supervisor = getSupervisor(2, 2, true, "PT1H", null);
supervisor = getSupervisor(2, 2, true, "PT1H", null, false);
addSomeEvents(1);
Capture<Task> captured = Capture.newInstance(CaptureType.ALL);
@ -1223,7 +1253,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
{
final TaskLocation location = new TaskLocation("testHost", 1234);
supervisor = getSupervisor(2, 2, true, "PT1M", null);
supervisor = getSupervisor(2, 2, true, "PT1M", null, false);
addSomeEvents(100);
Capture<Task> captured = Capture.newInstance(CaptureType.ALL);
@ -1290,7 +1320,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
{
final TaskLocation location = new TaskLocation("testHost", 1234);
supervisor = getSupervisor(2, 2, true, "PT1M", null);
supervisor = getSupervisor(2, 2, true, "PT1M", null, false);
addSomeEvents(100);
Capture<Task> captured = Capture.newInstance(CaptureType.ALL);
@ -1363,7 +1393,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
@Test(expected = IllegalStateException.class)
public void testStopNotStarted() throws Exception
{
supervisor = getSupervisor(1, 1, true, "PT1H", null);
supervisor = getSupervisor(1, 1, true, "PT1H", null, false);
supervisor.stop(false);
}
@ -1375,7 +1405,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
taskRunner.unregisterListener(String.format("KafkaSupervisor-%s", DATASOURCE));
replayAll();
supervisor = getSupervisor(1, 1, true, "PT1H", null);
supervisor = getSupervisor(1, 1, true, "PT1H", null, false);
supervisor.start();
supervisor.stop(false);
@ -1389,7 +1419,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
final TaskLocation location2 = new TaskLocation("testHost2", 145);
final DateTime startTime = new DateTime();
supervisor = getSupervisor(2, 1, true, "PT1H", null);
supervisor = getSupervisor(2, 1, true, "PT1H", null, false);
addSomeEvents(1);
Task id1 = createKafkaIndexTask(
@ -1470,7 +1500,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
@Test
public void testResetNoTasks() throws Exception
{
supervisor = getSupervisor(1, 1, true, "PT1H", null);
supervisor = getSupervisor(1, 1, true, "PT1H", null, false);
expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes();
expect(taskRunner.getRunningTasks()).andReturn(Collections.EMPTY_LIST).anyTimes();
@ -1494,7 +1524,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
@Test
public void testResetDataSourceMetadata() throws Exception
{
supervisor = getSupervisor(1, 1, true, "PT1H", null);
supervisor = getSupervisor(1, 1, true, "PT1H", null, false);
expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes();
expect(taskRunner.getRunningTasks()).andReturn(Collections.EMPTY_LIST).anyTimes();
@ -1542,7 +1572,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
@Test
public void testResetNoDataSourceMetadata() throws Exception
{
supervisor = getSupervisor(1, 1, true, "PT1H", null);
supervisor = getSupervisor(1, 1, true, "PT1H", null, false);
expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes();
expect(taskRunner.getRunningTasks()).andReturn(Collections.EMPTY_LIST).anyTimes();
@ -1575,7 +1605,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
final TaskLocation location2 = new TaskLocation("testHost2", 145);
final DateTime startTime = new DateTime();
supervisor = getSupervisor(2, 1, true, "PT1H", null);
supervisor = getSupervisor(2, 1, true, "PT1H", null, false);
addSomeEvents(1);
Task id1 = createKafkaIndexTask(
@ -1671,7 +1701,8 @@ public class KafkaSupervisorTest extends EasyMockSupport
int taskCount,
boolean useEarliestOffset,
String duration,
Period lateMessageRejectionPeriod
Period lateMessageRejectionPeriod,
boolean skipOffsetGaps
)
{
KafkaSupervisorIOConfig kafkaSupervisorIOConfig = new KafkaSupervisorIOConfig(
@ -1684,7 +1715,8 @@ public class KafkaSupervisorTest extends EasyMockSupport
new Period("PT30S"),
useEarliestOffset,
new Period("PT30M"),
lateMessageRejectionPeriod
lateMessageRejectionPeriod,
skipOffsetGaps
);
KafkaIndexTaskClientFactory taskClientFactory = new KafkaIndexTaskClientFactory(null, null)
@ -1782,7 +1814,8 @@ public class KafkaSupervisorTest extends EasyMockSupport
ImmutableMap.<String, String>of(),
true,
false,
minimumMessageTime
minimumMessageTime,
false
),
ImmutableMap.<String, Object>of(),
null