Kafka task minimum message time (#3035)

* add KafkaIndexTask support for minimumMessageTime

* add Kafka supervisor support for lateMessageRejectionPeriod
This commit is contained in:
David Lim 2016-05-31 12:37:00 -06:00 committed by Gian Merlino
parent e662efa79f
commit f6c39cc844
10 changed files with 850 additions and 76 deletions

View File

@ -139,6 +139,7 @@ The tuningConfig is optional and default parameters will be used if no tuningCon
|`period`|ISO8601 Period|How often the supervisor will execute its management logic. Note that the supervisor will also run in response to certain events (such as tasks succeeding, failing, and reaching their taskDuration) so this value specifies the maximum time between iterations.|no (default == PT30S)|
|`useEarliestOffset`|Boolean|If a supervisor is managing a dataSource for the first time, it will obtain a set of starting offsets from Kafka. This flag determines whether it retrieves the earliest or latest offsets in Kafka. Under normal circumstances, subsequent tasks will start from where the previous segments ended so this flag will only be used on first run.|no (default == false)|
|`completionTimeout`|ISO8601 Period|The length of time to wait before declaring a publishing task as failed and terminating it. If this is set too low, your tasks may never publish. The publishing clock for a task begins roughly after `taskDuration` elapses.|no (default == PT30M)|
|`lateMessageRejectionPeriod`|ISO8601 Period|Configure tasks to reject messages with timestamps earlier than this period before the task was created; for example if this is set to `PT1H` and the supervisor creates a task at *2016-01-01T12:00Z*, messages with timestamps earlier than *2016-01-01T11:00Z* will be dropped. This may help prevent concurrency issues if your data stream has late messages and you have multiple pipelines that need to operate on the same segments (e.g. a realtime and a nightly batch ingestion pipeline).|no (default == none)|
## Supervisor API

View File

@ -21,8 +21,10 @@ package io.druid.indexing.kafka;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import io.druid.segment.indexing.IOConfig;
import org.joda.time.DateTime;
import java.util.Map;
@ -37,6 +39,7 @@ public class KafkaIOConfig implements IOConfig
private final Map<String, String> consumerProperties;
private final boolean useTransaction;
private final boolean pauseAfterRead;
private final Optional<DateTime> minimumMessageTime;
@JsonCreator
public KafkaIOConfig(
@ -45,7 +48,8 @@ public class KafkaIOConfig implements IOConfig
@JsonProperty("endPartitions") KafkaPartitions endPartitions,
@JsonProperty("consumerProperties") Map<String, String> consumerProperties,
@JsonProperty("useTransaction") Boolean useTransaction,
@JsonProperty("pauseAfterRead") Boolean pauseAfterRead
@JsonProperty("pauseAfterRead") Boolean pauseAfterRead,
@JsonProperty("minimumMessageTime") DateTime minimumMessageTime
)
{
this.baseSequenceName = Preconditions.checkNotNull(baseSequenceName, "baseSequenceName");
@ -54,6 +58,7 @@ public class KafkaIOConfig implements IOConfig
this.consumerProperties = Preconditions.checkNotNull(consumerProperties, "consumerProperties");
this.useTransaction = useTransaction != null ? useTransaction : DEFAULT_USE_TRANSACTION;
this.pauseAfterRead = pauseAfterRead != null ? pauseAfterRead : DEFAULT_PAUSE_AFTER_READ;
this.minimumMessageTime = Optional.fromNullable(minimumMessageTime);
Preconditions.checkArgument(
startPartitions.getTopic().equals(endPartitions.getTopic()),
@ -111,6 +116,12 @@ public class KafkaIOConfig implements IOConfig
return pauseAfterRead;
}
@JsonProperty
public Optional<DateTime> getMinimumMessageTime()
{
return minimumMessageTime;
}
@Override
public String toString()
{
@ -121,6 +132,7 @@ public class KafkaIOConfig implements IOConfig
", consumerProperties=" + consumerProperties +
", useTransaction=" + useTransaction +
", pauseAfterRead=" + pauseAfterRead +
", minimumMessageTime=" + minimumMessageTime +
'}';
}
}

View File

@ -51,7 +51,6 @@ import io.druid.indexing.appenderator.ActionBasedSegmentAllocator;
import io.druid.indexing.appenderator.ActionBasedUsedSegmentChecker;
import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.TaskToolbox;
import io.druid.indexing.common.actions.SegmentInsertAction;
import io.druid.indexing.common.actions.SegmentTransactionalInsertAction;
import io.druid.indexing.common.actions.TaskActionClient;
import io.druid.indexing.common.task.AbstractTask;
@ -411,20 +410,27 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler
try {
final InputRow row = Preconditions.checkNotNull(parser.parse(ByteBuffer.wrap(record.value())), "row");
final SegmentIdentifier identifier = driver.add(
row,
sequenceNames.get(record.partition()),
committerSupplier
);
if (identifier == null) {
// Failure to allocate segment puts determinism at risk, bail out to be safe.
// May want configurable behavior here at some point.
// If we allow continuing, then consider blacklisting the interval for a while to avoid constant checks.
throw new ISE("Could not allocate segment for row with timestamp[%s]", row.getTimestamp());
if (!ioConfig.getMinimumMessageTime().isPresent() ||
!ioConfig.getMinimumMessageTime().get().isAfter(row.getTimestamp())) {
final SegmentIdentifier identifier = driver.add(
row,
sequenceNames.get(record.partition()),
committerSupplier
);
if (identifier == null) {
// Failure to allocate segment puts determinism at risk, bail out to be safe.
// May want configurable behavior here at some point.
// If we allow continuing, then consider blacklisting the interval for a while to avoid constant checks.
throw new ISE("Could not allocate segment for row with timestamp[%s]", row.getTimestamp());
}
fireDepartmentMetrics.incrementProcessed();
} else {
fireDepartmentMetrics.incrementThrownAway();
}
fireDepartmentMetrics.incrementProcessed();
}
catch (ParseException e) {
if (tuningConfig.isReportParseExceptions()) {

View File

@ -37,8 +37,8 @@ import com.google.common.util.concurrent.MoreExecutors;
import com.metamx.common.ISE;
import com.metamx.emitter.EmittingLogger;
import io.druid.concurrent.Execs;
import io.druid.indexing.common.TaskLocation;
import io.druid.indexing.common.TaskInfoProvider;
import io.druid.indexing.common.TaskLocation;
import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.task.Task;
import io.druid.indexing.common.task.TaskResource;
@ -113,11 +113,13 @@ public class KafkaSupervisor implements Supervisor
final Map<Integer, Long> partitionOffsets;
final Map<String, TaskData> tasks = new HashMap<>();
final Optional<DateTime> minimumMessageTime;
DateTime completionTimeout; // is set after signalTasksToFinish(); if not done by timeout, take corrective action
public TaskGroup(Map<Integer, Long> partitionOffsets)
public TaskGroup(Map<Integer, Long> partitionOffsets, Optional<DateTime> minimumMessageTime)
{
this.partitionOffsets = partitionOffsets;
this.minimumMessageTime = minimumMessageTime;
}
}
@ -450,6 +452,9 @@ public class KafkaSupervisor implements Supervisor
}
String partitionOffsetStr = sb.toString().substring(1);
Optional<DateTime> minimumMessageTime = taskGroups.get(groupId).minimumMessageTime;
String minMsgTimeStr = (minimumMessageTime.isPresent() ? String.valueOf(minimumMessageTime.get().getMillis()) : "");
String dataSchema, tuningConfig;
try {
dataSchema = sortingMapper.writeValueAsString(spec.getDataSchema());
@ -459,7 +464,8 @@ public class KafkaSupervisor implements Supervisor
throw Throwables.propagate(e);
}
String hashCode = DigestUtils.sha1Hex(dataSchema + tuningConfig + partitionOffsetStr).substring(0, 15);
String hashCode = DigestUtils.sha1Hex(dataSchema + tuningConfig + partitionOffsetStr + minMsgTimeStr)
.substring(0, 15);
return Joiner.on("_").join("index_kafka", dataSource, hashCode);
}
@ -599,11 +605,19 @@ public class KafkaSupervisor implements Supervisor
log.debug("Creating new task group [%d]", taskGroupId);
taskGroups.put(
taskGroupId,
new TaskGroup(kafkaTask.getIOConfig().getStartPartitions().getPartitionOffsetMap())
new TaskGroup(
kafkaTask.getIOConfig().getStartPartitions().getPartitionOffsetMap(),
kafkaTask.getIOConfig().getMinimumMessageTime()
)
);
}
taskGroups.get(taskGroupId).tasks.put(taskId, new TaskData());
if (!isTaskCurrent(taskGroupId, taskId)) {
log.info("Stopping task [%s] which does not match the expected parameters and ingestion spec", taskId);
stopTask(taskId, false);
} else {
taskGroups.get(taskGroupId).tasks.put(taskId, new TaskData());
}
}
}
}
@ -635,7 +649,11 @@ public class KafkaSupervisor implements Supervisor
}
log.info("Creating new pending completion task group for discovered task [%s]", taskId);
TaskGroup newTaskGroup = new TaskGroup(startingPartitions);
// reading the minimumMessageTime from the publishing task and setting it here is not necessary as this task cannot
// change to a state where it will read any more events
TaskGroup newTaskGroup = new TaskGroup(startingPartitions, Optional.<DateTime>absent());
newTaskGroup.tasks.put(taskId, new TaskData());
newTaskGroup.completionTimeout = DateTime.now().plus(ioConfig.getCompletionTimeout());
@ -889,7 +907,8 @@ public class KafkaSupervisor implements Supervisor
TaskGroup taskGroup = taskGroupEntry.getValue();
// Iterate the list of known tasks in this group and:
// 1) Kill any tasks which are not "current" (have the partitions and starting offsets in [taskGroups]
// 1) Kill any tasks which are not "current" (have the partitions, starting offsets, and minimumMessageTime
// (if applicable) in [taskGroups])
// 2) Remove any tasks that have failed from the list
// 3) If any task completed successfully, stop all the tasks in this group and move to the next group
@ -933,7 +952,12 @@ public class KafkaSupervisor implements Supervisor
for (Integer groupId : partitionGroups.keySet()) {
if (!taskGroups.containsKey(groupId)) {
log.info("Creating new task group [%d] for partitions %s", groupId, partitionGroups.get(groupId).keySet());
taskGroups.put(groupId, new TaskGroup(generateStartingOffsetsForPartitionGroup(groupId)));
Optional<DateTime> minimumMessageTime = (ioConfig.getLateMessageRejectionPeriod().isPresent() ? Optional.of(
DateTime.now().minus(ioConfig.getLateMessageRejectionPeriod().get())
) : Optional.<DateTime>absent());
taskGroups.put(groupId, new TaskGroup(generateStartingOffsetsForPartitionGroup(groupId), minimumMessageTime));
}
}
@ -971,6 +995,7 @@ public class KafkaSupervisor implements Supervisor
String sequenceName = generateSequenceName(groupId);
Map<String, String> consumerProperties = Maps.newHashMap(ioConfig.getConsumerProperties());
DateTime minimumMessageTime = taskGroups.get(groupId).minimumMessageTime.orNull();
KafkaIOConfig kafkaIOConfig = new KafkaIOConfig(
sequenceName,
@ -978,7 +1003,8 @@ public class KafkaSupervisor implements Supervisor
new KafkaPartitions(ioConfig.getTopic(), endPartitions),
consumerProperties,
true,
false
false,
minimumMessageTime
);
for (int i = 0; i < replicas; i++) {
@ -1098,7 +1124,8 @@ public class KafkaSupervisor implements Supervisor
/**
* Compares the sequence name from the task with one generated for the task's group ID and returns false if they do
* not match. The sequence name is generated from a hash of the dataSchema, tuningConfig, and starting offsets.
* not match. The sequence name is generated from a hash of the dataSchema, tuningConfig, starting offsets, and the
* minimumMessageTime if set.
*/
private boolean isTaskCurrent(int taskGroupId, String taskId)
{

View File

@ -21,6 +21,7 @@ package io.druid.indexing.kafka.supervisor;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import org.joda.time.Duration;
import org.joda.time.Period;
@ -40,6 +41,7 @@ public class KafkaSupervisorIOConfig
private final Duration period;
private final Boolean useEarliestOffset;
private final Duration completionTimeout;
private final Optional<Duration> lateMessageRejectionPeriod;
@JsonCreator
public KafkaSupervisorIOConfig(
@ -51,7 +53,8 @@ public class KafkaSupervisorIOConfig
@JsonProperty("startDelay") Period startDelay,
@JsonProperty("period") Period period,
@JsonProperty("useEarliestOffset") Boolean useEarliestOffset,
@JsonProperty("completionTimeout") Period completionTimeout
@JsonProperty("completionTimeout") Period completionTimeout,
@JsonProperty("lateMessageRejectionPeriod") Period lateMessageRejectionPeriod
)
{
this.topic = Preconditions.checkNotNull(topic, "topic");
@ -68,6 +71,9 @@ public class KafkaSupervisorIOConfig
this.period = defaultDuration(period, "PT30S");
this.useEarliestOffset = (useEarliestOffset != null ? useEarliestOffset : false);
this.completionTimeout = defaultDuration(completionTimeout, "PT30M");
this.lateMessageRejectionPeriod = (lateMessageRejectionPeriod == null
? Optional.<Duration>absent()
: Optional.of(lateMessageRejectionPeriod.toStandardDuration()));
}
@JsonProperty
@ -124,6 +130,12 @@ public class KafkaSupervisorIOConfig
return completionTimeout;
}
@JsonProperty
public Optional<Duration> getLateMessageRejectionPeriod()
{
return lateMessageRejectionPeriod;
}
private static Duration defaultDuration(final Period period, final String theDefault)
{
return (period == null ? new Period(theDefault) : period).toStandardDuration();

View File

@ -0,0 +1,248 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.indexing.kafka;
import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableMap;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.segment.indexing.IOConfig;
import org.hamcrest.CoreMatchers;
import org.joda.time.DateTime;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
public class KafkaIOConfigTest
{
private final ObjectMapper mapper;
public KafkaIOConfigTest()
{
mapper = new DefaultObjectMapper();
mapper.registerModules((Iterable<Module>) new KafkaIndexTaskModule().getJacksonModules());
}
@Rule
public final ExpectedException exception = ExpectedException.none();
@Test
public void testSerdeWithDefaults() throws Exception
{
String jsonStr = "{\n"
+ " \"type\": \"kafka\",\n"
+ " \"baseSequenceName\": \"my-sequence-name\",\n"
+ " \"startPartitions\": {\"topic\":\"mytopic\", \"partitionOffsetMap\" : {\"0\":1, \"1\":10}},\n"
+ " \"endPartitions\": {\"topic\":\"mytopic\", \"partitionOffsetMap\" : {\"0\":15, \"1\":200}},\n"
+ " \"consumerProperties\": {\"bootstrap.servers\":\"localhost:9092\"}\n"
+ "}";
KafkaIOConfig config = (KafkaIOConfig) mapper.readValue(
mapper.writeValueAsString(
mapper.readValue(
jsonStr,
IOConfig.class
)
), IOConfig.class
);
Assert.assertEquals("my-sequence-name", config.getBaseSequenceName());
Assert.assertEquals("mytopic", config.getStartPartitions().getTopic());
Assert.assertEquals(ImmutableMap.of(0, 1L, 1, 10L), config.getStartPartitions().getPartitionOffsetMap());
Assert.assertEquals("mytopic", config.getEndPartitions().getTopic());
Assert.assertEquals(ImmutableMap.of(0, 15L, 1, 200L), config.getEndPartitions().getPartitionOffsetMap());
Assert.assertEquals(ImmutableMap.of("bootstrap.servers", "localhost:9092"), config.getConsumerProperties());
Assert.assertEquals(true, config.isUseTransaction());
Assert.assertEquals(false, config.isPauseAfterRead());
Assert.assertFalse("minimumMessageTime", config.getMinimumMessageTime().isPresent());
}
@Test
public void testSerdeWithNonDefaults() throws Exception
{
String jsonStr = "{\n"
+ " \"type\": \"kafka\",\n"
+ " \"baseSequenceName\": \"my-sequence-name\",\n"
+ " \"startPartitions\": {\"topic\":\"mytopic\", \"partitionOffsetMap\" : {\"0\":1, \"1\":10}},\n"
+ " \"endPartitions\": {\"topic\":\"mytopic\", \"partitionOffsetMap\" : {\"0\":15, \"1\":200}},\n"
+ " \"consumerProperties\": {\"bootstrap.servers\":\"localhost:9092\"},\n"
+ " \"useTransaction\": false,\n"
+ " \"pauseAfterRead\": true,\n"
+ " \"minimumMessageTime\": \"2016-05-31T12:00Z\"\n"
+ "}";
KafkaIOConfig config = (KafkaIOConfig) mapper.readValue(
mapper.writeValueAsString(
mapper.readValue(
jsonStr,
IOConfig.class
)
), IOConfig.class
);
Assert.assertEquals("my-sequence-name", config.getBaseSequenceName());
Assert.assertEquals("mytopic", config.getStartPartitions().getTopic());
Assert.assertEquals(ImmutableMap.of(0, 1L, 1, 10L), config.getStartPartitions().getPartitionOffsetMap());
Assert.assertEquals("mytopic", config.getEndPartitions().getTopic());
Assert.assertEquals(ImmutableMap.of(0, 15L, 1, 200L), config.getEndPartitions().getPartitionOffsetMap());
Assert.assertEquals(ImmutableMap.of("bootstrap.servers", "localhost:9092"), config.getConsumerProperties());
Assert.assertEquals(false, config.isUseTransaction());
Assert.assertEquals(true, config.isPauseAfterRead());
Assert.assertEquals(new DateTime("2016-05-31T12:00Z"), config.getMinimumMessageTime().get());
}
@Test
public void testBaseSequenceNameRequired() throws Exception
{
String jsonStr = "{\n"
+ " \"type\": \"kafka\",\n"
+ " \"startPartitions\": {\"topic\":\"mytopic\", \"partitionOffsetMap\" : {\"0\":1, \"1\":10}},\n"
+ " \"endPartitions\": {\"topic\":\"mytopic\", \"partitionOffsetMap\" : {\"0\":15, \"1\":200}},\n"
+ " \"consumerProperties\": {\"bootstrap.servers\":\"localhost:9092\"},\n"
+ " \"useTransaction\": false,\n"
+ " \"pauseAfterRead\": true,\n"
+ " \"minimumMessageTime\": \"2016-05-31T12:00Z\"\n"
+ "}";
exception.expect(JsonMappingException.class);
exception.expectCause(CoreMatchers.isA(NullPointerException.class));
exception.expectMessage(CoreMatchers.containsString("baseSequenceName"));
mapper.readValue(jsonStr, IOConfig.class);
}
@Test
public void testStartPartitionsRequired() throws Exception
{
String jsonStr = "{\n"
+ " \"type\": \"kafka\",\n"
+ " \"baseSequenceName\": \"my-sequence-name\",\n"
+ " \"endPartitions\": {\"topic\":\"mytopic\", \"partitionOffsetMap\" : {\"0\":15, \"1\":200}},\n"
+ " \"consumerProperties\": {\"bootstrap.servers\":\"localhost:9092\"},\n"
+ " \"useTransaction\": false,\n"
+ " \"pauseAfterRead\": true,\n"
+ " \"minimumMessageTime\": \"2016-05-31T12:00Z\"\n"
+ "}";
exception.expect(JsonMappingException.class);
exception.expectCause(CoreMatchers.isA(NullPointerException.class));
exception.expectMessage(CoreMatchers.containsString("startPartitions"));
mapper.readValue(jsonStr, IOConfig.class);
}
@Test
public void testEndPartitionsRequired() throws Exception
{
String jsonStr = "{\n"
+ " \"type\": \"kafka\",\n"
+ " \"baseSequenceName\": \"my-sequence-name\",\n"
+ " \"startPartitions\": {\"topic\":\"mytopic\", \"partitionOffsetMap\" : {\"0\":1, \"1\":10}},\n"
+ " \"consumerProperties\": {\"bootstrap.servers\":\"localhost:9092\"},\n"
+ " \"useTransaction\": false,\n"
+ " \"pauseAfterRead\": true,\n"
+ " \"minimumMessageTime\": \"2016-05-31T12:00Z\"\n"
+ "}";
exception.expect(JsonMappingException.class);
exception.expectCause(CoreMatchers.isA(NullPointerException.class));
exception.expectMessage(CoreMatchers.containsString("endPartitions"));
mapper.readValue(jsonStr, IOConfig.class);
}
@Test
public void testConsumerPropertiesRequired() throws Exception
{
String jsonStr = "{\n"
+ " \"type\": \"kafka\",\n"
+ " \"baseSequenceName\": \"my-sequence-name\",\n"
+ " \"startPartitions\": {\"topic\":\"mytopic\", \"partitionOffsetMap\" : {\"0\":1, \"1\":10}},\n"
+ " \"endPartitions\": {\"topic\":\"mytopic\", \"partitionOffsetMap\" : {\"0\":15, \"1\":200}},\n"
+ " \"useTransaction\": false,\n"
+ " \"pauseAfterRead\": true,\n"
+ " \"minimumMessageTime\": \"2016-05-31T12:00Z\"\n"
+ "}";
exception.expect(JsonMappingException.class);
exception.expectCause(CoreMatchers.isA(NullPointerException.class));
exception.expectMessage(CoreMatchers.containsString("consumerProperties"));
mapper.readValue(jsonStr, IOConfig.class);
}
@Test
public void testStartAndEndTopicMatch() throws Exception
{
String jsonStr = "{\n"
+ " \"type\": \"kafka\",\n"
+ " \"baseSequenceName\": \"my-sequence-name\",\n"
+ " \"startPartitions\": {\"topic\":\"mytopic\", \"partitionOffsetMap\" : {\"0\":1, \"1\":10}},\n"
+ " \"endPartitions\": {\"topic\":\"other\", \"partitionOffsetMap\" : {\"0\":15, \"1\":200}},\n"
+ " \"consumerProperties\": {\"bootstrap.servers\":\"localhost:9092\"},\n"
+ " \"useTransaction\": false,\n"
+ " \"pauseAfterRead\": true,\n"
+ " \"minimumMessageTime\": \"2016-05-31T12:00Z\"\n"
+ "}";
exception.expect(JsonMappingException.class);
exception.expectCause(CoreMatchers.isA(IllegalArgumentException.class));
exception.expectMessage(CoreMatchers.containsString("start topic and end topic must match"));
mapper.readValue(jsonStr, IOConfig.class);
}
@Test
public void testStartAndEndPartitionSetMatch() throws Exception
{
String jsonStr = "{\n"
+ " \"type\": \"kafka\",\n"
+ " \"baseSequenceName\": \"my-sequence-name\",\n"
+ " \"startPartitions\": {\"topic\":\"mytopic\", \"partitionOffsetMap\" : {\"0\":1, \"1\":10}},\n"
+ " \"endPartitions\": {\"topic\":\"mytopic\", \"partitionOffsetMap\" : {\"0\":15}},\n"
+ " \"consumerProperties\": {\"bootstrap.servers\":\"localhost:9092\"},\n"
+ " \"useTransaction\": false,\n"
+ " \"pauseAfterRead\": true,\n"
+ " \"minimumMessageTime\": \"2016-05-31T12:00Z\"\n"
+ "}";
exception.expect(JsonMappingException.class);
exception.expectCause(CoreMatchers.isA(IllegalArgumentException.class));
exception.expectMessage(CoreMatchers.containsString("start partition set and end partition set must match"));
mapper.readValue(jsonStr, IOConfig.class);
}
@Test
public void testEndOffsetGreaterThanStart() throws Exception
{
String jsonStr = "{\n"
+ " \"type\": \"kafka\",\n"
+ " \"baseSequenceName\": \"my-sequence-name\",\n"
+ " \"startPartitions\": {\"topic\":\"mytopic\", \"partitionOffsetMap\" : {\"0\":1, \"1\":10}},\n"
+ " \"endPartitions\": {\"topic\":\"mytopic\", \"partitionOffsetMap\" : {\"0\":15, \"1\":2}},\n"
+ " \"consumerProperties\": {\"bootstrap.servers\":\"localhost:9092\"},\n"
+ " \"useTransaction\": false,\n"
+ " \"pauseAfterRead\": true,\n"
+ " \"minimumMessageTime\": \"2016-05-31T12:00Z\"\n"
+ "}";
exception.expect(JsonMappingException.class);
exception.expectCause(CoreMatchers.isA(IllegalArgumentException.class));
exception.expectMessage(CoreMatchers.containsString("end offset must be >= start offset"));
mapper.readValue(jsonStr, IOConfig.class);
}
}

View File

@ -117,6 +117,7 @@ import org.apache.curator.test.TestingCluster;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.easymock.EasyMock;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.joda.time.Period;
import org.junit.After;
@ -301,7 +302,8 @@ public class KafkaIndexTaskTest
new KafkaPartitions("topic0", ImmutableMap.of(0, 5L)),
kafkaServer.consumerProperties(),
true,
false
false,
null
),
null
);
@ -341,7 +343,8 @@ public class KafkaIndexTaskTest
new KafkaPartitions("topic0", ImmutableMap.of(0, 5L)),
kafkaServer.consumerProperties(),
true,
false
false,
null
),
null
);
@ -382,6 +385,59 @@ public class KafkaIndexTaskTest
Assert.assertEquals(ImmutableList.of("d", "e"), readSegmentDim1(desc2));
}
@Test(timeout = 60_000L)
public void testRunWithMinimumMessageTime() throws Exception
{
final KafkaIndexTask task = createTask(
null,
new KafkaIOConfig(
"sequence0",
new KafkaPartitions("topic0", ImmutableMap.of(0, 0L)),
new KafkaPartitions("topic0", ImmutableMap.of(0, 5L)),
kafkaServer.consumerProperties(),
true,
false,
new DateTime("2010")
),
null
);
final ListenableFuture<TaskStatus> future = runTask(task);
// Wait for the task to start reading
while (task.getStatus() != KafkaIndexTask.Status.READING) {
Thread.sleep(10);
}
// Insert data
try (final KafkaProducer<byte[], byte[]> kafkaProducer = kafkaServer.newProducer()) {
for (ProducerRecord<byte[], byte[]> record : RECORDS) {
kafkaProducer.send(record).get();
}
}
// Wait for task to exit
Assert.assertEquals(TaskStatus.Status.SUCCESS, future.get().getStatusCode());
// Check metrics
Assert.assertEquals(3, task.getFireDepartmentMetrics().processed());
Assert.assertEquals(0, task.getFireDepartmentMetrics().unparseable());
Assert.assertEquals(2, task.getFireDepartmentMetrics().thrownAway());
// Check published metadata
SegmentDescriptor desc1 = SD(task, "2010/P1D", 0);
SegmentDescriptor desc2 = SD(task, "2011/P1D", 0);
Assert.assertEquals(ImmutableSet.of(desc1, desc2), publishedDescriptors());
Assert.assertEquals(
new KafkaDataSourceMetadata(new KafkaPartitions("topic0", ImmutableMap.of(0, 5L))),
metadataStorageCoordinator.getDataSourceMetadata(DATA_SCHEMA.getDataSource())
);
// Check segments in deep storage
Assert.assertEquals(ImmutableList.of("c"), readSegmentDim1(desc1));
Assert.assertEquals(ImmutableList.of("d", "e"), readSegmentDim1(desc2));
}
@Test(timeout = 60_000L)
public void testRunOnNothing() throws Exception
{
@ -400,7 +456,8 @@ public class KafkaIndexTaskTest
new KafkaPartitions("topic0", ImmutableMap.of(0, 2L)),
kafkaServer.consumerProperties(),
true,
false
false,
null
),
null
);
@ -439,7 +496,8 @@ public class KafkaIndexTaskTest
new KafkaPartitions("topic0", ImmutableMap.of(0, 5L)),
kafkaServer.consumerProperties(),
true,
false
false,
null
),
null
);
@ -489,7 +547,8 @@ public class KafkaIndexTaskTest
new KafkaPartitions("topic0", ImmutableMap.of(0, 5L)),
kafkaServer.consumerProperties(),
true,
false
false,
null
),
null
);
@ -538,7 +597,8 @@ public class KafkaIndexTaskTest
new KafkaPartitions("topic0", ImmutableMap.of(0, 7L)),
kafkaServer.consumerProperties(),
true,
false
false,
null
),
null
);
@ -569,7 +629,8 @@ public class KafkaIndexTaskTest
new KafkaPartitions("topic0", ImmutableMap.of(0, 5L)),
kafkaServer.consumerProperties(),
true,
false
false,
null
),
null
);
@ -581,7 +642,8 @@ public class KafkaIndexTaskTest
new KafkaPartitions("topic0", ImmutableMap.of(0, 5L)),
kafkaServer.consumerProperties(),
true,
false
false,
null
),
null
);
@ -633,7 +695,8 @@ public class KafkaIndexTaskTest
new KafkaPartitions("topic0", ImmutableMap.of(0, 5L)),
kafkaServer.consumerProperties(),
true,
false
false,
null
),
null
);
@ -645,7 +708,8 @@ public class KafkaIndexTaskTest
new KafkaPartitions("topic0", ImmutableMap.of(0, 7L)),
kafkaServer.consumerProperties(),
true,
false
false,
null
),
null
);
@ -698,7 +762,8 @@ public class KafkaIndexTaskTest
new KafkaPartitions("topic0", ImmutableMap.of(0, 5L)),
kafkaServer.consumerProperties(),
false,
false
false,
null
),
null
);
@ -710,7 +775,8 @@ public class KafkaIndexTaskTest
new KafkaPartitions("topic0", ImmutableMap.of(0, 7L)),
kafkaServer.consumerProperties(),
false,
false
false,
null
),
null
);
@ -768,7 +834,8 @@ public class KafkaIndexTaskTest
new KafkaPartitions("topic0", ImmutableMap.of(0, 5L, 1, 2L)),
kafkaServer.consumerProperties(),
true,
false
false,
null
),
null
);
@ -823,7 +890,8 @@ public class KafkaIndexTaskTest
new KafkaPartitions("topic0", ImmutableMap.of(0, 5L)),
kafkaServer.consumerProperties(),
true,
false
false,
null
),
null
);
@ -835,7 +903,8 @@ public class KafkaIndexTaskTest
new KafkaPartitions("topic0", ImmutableMap.of(1, 1L)),
kafkaServer.consumerProperties(),
true,
false
false,
null
),
null
);
@ -889,7 +958,8 @@ public class KafkaIndexTaskTest
new KafkaPartitions("topic0", ImmutableMap.of(0, 5L)),
kafkaServer.consumerProperties(),
true,
false
false,
null
),
null
);
@ -922,7 +992,8 @@ public class KafkaIndexTaskTest
new KafkaPartitions("topic0", ImmutableMap.of(0, 5L)),
kafkaServer.consumerProperties(),
true,
false
false,
null
),
null
);
@ -972,7 +1043,8 @@ public class KafkaIndexTaskTest
new KafkaPartitions("topic0", ImmutableMap.of(0, 5L)),
kafkaServer.consumerProperties(),
true,
false
false,
null
),
null
);
@ -1053,7 +1125,8 @@ public class KafkaIndexTaskTest
new KafkaPartitions("topic0", ImmutableMap.of(0, 3L)),
kafkaServer.consumerProperties(),
true,
true
true,
null
),
null
);

View File

@ -0,0 +1,104 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.indexing.kafka;
import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.segment.IndexSpec;
import io.druid.segment.indexing.TuningConfig;
import org.joda.time.DateTime;
import org.joda.time.Period;
import org.junit.Assert;
import org.junit.Test;
import java.io.File;
public class KafkaTuningConfigTest
{
private final ObjectMapper mapper;
public KafkaTuningConfigTest()
{
mapper = new DefaultObjectMapper();
mapper.registerModules((Iterable<Module>) new KafkaIndexTaskModule().getJacksonModules());
}
@Test
public void testSerdeWithDefaults() throws Exception
{
String jsonStr = "{\"type\": \"kafka\"}";
KafkaTuningConfig config = (KafkaTuningConfig) mapper.readValue(
mapper.writeValueAsString(
mapper.readValue(
jsonStr,
TuningConfig.class
)
),
TuningConfig.class
);
Assert.assertNotNull(config.getBasePersistDirectory());
Assert.assertEquals(75000, config.getMaxRowsInMemory());
Assert.assertEquals(5_000_000, config.getMaxRowsPerSegment());
Assert.assertEquals(new Period("PT10M"), config.getIntermediatePersistPeriod());
Assert.assertEquals(0, config.getMaxPendingPersists());
Assert.assertEquals(new IndexSpec(), config.getIndexSpec());
Assert.assertEquals(false, config.getBuildV9Directly());
Assert.assertEquals(false, config.isReportParseExceptions());
Assert.assertEquals(0, config.getHandoffConditionTimeout());
}
@Test
public void testSerdeWithNonDefaults() throws Exception
{
String jsonStr = "{\n"
+ " \"type\": \"kafka\",\n"
+ " \"basePersistDirectory\": \"/tmp/xxx\",\n"
+ " \"maxRowsInMemory\": 100,\n"
+ " \"maxRowsPerSegment\": 100,\n"
+ " \"intermediatePersistPeriod\": \"PT1H\",\n"
+ " \"maxPendingPersists\": 100,\n"
+ " \"buildV9Directly\": true,\n"
+ " \"reportParseExceptions\": true,\n"
+ " \"handoffConditionTimeout\": 100\n"
+ "}";
KafkaTuningConfig config = (KafkaTuningConfig) mapper.readValue(
mapper.writeValueAsString(
mapper.readValue(
jsonStr,
TuningConfig.class
)
),
TuningConfig.class
);
Assert.assertEquals(new File("/tmp/xxx"), config.getBasePersistDirectory());
Assert.assertEquals(100, config.getMaxRowsInMemory());
Assert.assertEquals(100, config.getMaxRowsPerSegment());
Assert.assertEquals(new Period("PT1H"), config.getIntermediatePersistPeriod());
Assert.assertEquals(100, config.getMaxPendingPersists());
Assert.assertEquals(true, config.getBuildV9Directly());
Assert.assertEquals(true, config.isReportParseExceptions());
Assert.assertEquals(100, config.getHandoffConditionTimeout());
}
}

View File

@ -0,0 +1,158 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.indexing.kafka.supervisor;
import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableMap;
import io.druid.indexing.kafka.KafkaIndexTaskModule;
import io.druid.jackson.DefaultObjectMapper;
import org.hamcrest.CoreMatchers;
import org.joda.time.Duration;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
public class KafkaSupervisorIOConfigTest
{
private final ObjectMapper mapper;
public KafkaSupervisorIOConfigTest()
{
mapper = new DefaultObjectMapper();
mapper.registerModules((Iterable<Module>) new KafkaIndexTaskModule().getJacksonModules());
}
@Rule
public final ExpectedException exception = ExpectedException.none();
@Test
public void testSerdeWithDefaults() throws Exception
{
String jsonStr = "{\n"
+ " \"type\": \"kafka\",\n"
+ " \"topic\": \"my-topic\",\n"
+ " \"consumerProperties\": {\"bootstrap.servers\":\"localhost:9092\"}\n"
+ "}";
KafkaSupervisorIOConfig config = mapper.readValue(
mapper.writeValueAsString(
mapper.readValue(
jsonStr,
KafkaSupervisorIOConfig.class
)
), KafkaSupervisorIOConfig.class
);
Assert.assertEquals("my-topic", config.getTopic());
Assert.assertEquals(1, (int) config.getReplicas());
Assert.assertEquals(1, (int) config.getTaskCount());
Assert.assertEquals(Duration.standardMinutes(60), config.getTaskDuration());
Assert.assertEquals(ImmutableMap.of("bootstrap.servers", "localhost:9092"), config.getConsumerProperties());
Assert.assertEquals(Duration.standardSeconds(5), config.getStartDelay());
Assert.assertEquals(Duration.standardSeconds(30), config.getPeriod());
Assert.assertEquals(false, config.isUseEarliestOffset());
Assert.assertEquals(Duration.standardMinutes(30), config.getCompletionTimeout());
Assert.assertFalse("lateMessageRejectionPeriod", config.getLateMessageRejectionPeriod().isPresent());
}
@Test
public void testSerdeWithNonDefaults() throws Exception
{
String jsonStr = "{\n"
+ " \"type\": \"kafka\",\n"
+ " \"topic\": \"my-topic\",\n"
+ " \"replicas\": 3,\n"
+ " \"taskCount\": 9,\n"
+ " \"taskDuration\": \"PT30M\",\n"
+ " \"consumerProperties\": {\"bootstrap.servers\":\"localhost:9092\"},\n"
+ " \"startDelay\": \"PT1M\",\n"
+ " \"period\": \"PT10S\",\n"
+ " \"useEarliestOffset\": true,\n"
+ " \"completionTimeout\": \"PT45M\",\n"
+ " \"lateMessageRejectionPeriod\": \"PT1H\"\n"
+ "}";
KafkaSupervisorIOConfig config = mapper.readValue(
mapper.writeValueAsString(
mapper.readValue(
jsonStr,
KafkaSupervisorIOConfig.class
)
), KafkaSupervisorIOConfig.class
);
Assert.assertEquals("my-topic", config.getTopic());
Assert.assertEquals(3, (int) config.getReplicas());
Assert.assertEquals(9, (int) config.getTaskCount());
Assert.assertEquals(Duration.standardMinutes(30), config.getTaskDuration());
Assert.assertEquals(ImmutableMap.of("bootstrap.servers", "localhost:9092"), config.getConsumerProperties());
Assert.assertEquals(Duration.standardMinutes(1), config.getStartDelay());
Assert.assertEquals(Duration.standardSeconds(10), config.getPeriod());
Assert.assertEquals(true, config.isUseEarliestOffset());
Assert.assertEquals(Duration.standardMinutes(45), config.getCompletionTimeout());
Assert.assertEquals(Duration.standardHours(1), config.getLateMessageRejectionPeriod().get());
}
@Test
public void testTopicRequired() throws Exception
{
String jsonStr = "{\n"
+ " \"type\": \"kafka\",\n"
+ " \"consumerProperties\": {\"bootstrap.servers\":\"localhost:9092\"}\n"
+ "}";
exception.expect(JsonMappingException.class);
exception.expectCause(CoreMatchers.isA(NullPointerException.class));
exception.expectMessage(CoreMatchers.containsString("topic"));
mapper.readValue(jsonStr, KafkaSupervisorIOConfig.class);
}
@Test
public void testConsumerPropertiesRequired() throws Exception
{
String jsonStr = "{\n"
+ " \"type\": \"kafka\",\n"
+ " \"topic\": \"my-topic\"\n"
+ "}";
exception.expect(JsonMappingException.class);
exception.expectCause(CoreMatchers.isA(NullPointerException.class));
exception.expectMessage(CoreMatchers.containsString("consumerProperties"));
mapper.readValue(jsonStr, KafkaSupervisorIOConfig.class);
}
@Test
public void testBootstrapServersRequired() throws Exception
{
String jsonStr = "{\n"
+ " \"type\": \"kafka\",\n"
+ " \"topic\": \"my-topic\",\n"
+ " \"consumerProperties\": {}\n"
+ "}";
exception.expect(JsonMappingException.class);
exception.expectCause(CoreMatchers.isA(NullPointerException.class));
exception.expectMessage(CoreMatchers.containsString("bootstrap.servers"));
mapper.readValue(jsonStr, KafkaSupervisorIOConfig.class);
}
}

View File

@ -36,8 +36,8 @@ import io.druid.data.input.impl.StringDimensionSchema;
import io.druid.data.input.impl.StringInputRowParser;
import io.druid.data.input.impl.TimestampSpec;
import io.druid.granularity.QueryGranularities;
import io.druid.indexing.common.TaskLocation;
import io.druid.indexing.common.TaskInfoProvider;
import io.druid.indexing.common.TaskLocation;
import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.task.RealtimeIndexTask;
import io.druid.indexing.common.task.Task;
@ -181,7 +181,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
@Test
public void testNoInitialState() throws Exception
{
supervisor = getSupervisor(1, 1, true, "PT1H");
supervisor = getSupervisor(1, 1, true, "PT1H", null);
addSomeEvents(1);
Capture<KafkaIndexTask> captured = Capture.newInstance();
@ -211,6 +211,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
Assert.assertEquals("sequenceName-0", taskConfig.getBaseSequenceName());
Assert.assertTrue("isUseTransaction", taskConfig.isUseTransaction());
Assert.assertFalse("pauseAfterRead", taskConfig.isPauseAfterRead());
Assert.assertFalse("minimumMessageTime", taskConfig.getMinimumMessageTime().isPresent());
Assert.assertEquals(KAFKA_TOPIC, taskConfig.getStartPartitions().getTopic());
Assert.assertEquals(0L, (long) taskConfig.getStartPartitions().getPartitionOffsetMap().get(0));
@ -226,7 +227,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
@Test
public void testMultiTask() throws Exception
{
supervisor = getSupervisor(1, 2, true, "PT1H");
supervisor = getSupervisor(1, 2, true, "PT1H", null);
addSomeEvents(1);
Capture<KafkaIndexTask> captured = Capture.newInstance(CaptureType.ALL);
@ -263,7 +264,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
@Test
public void testReplicas() throws Exception
{
supervisor = getSupervisor(2, 1, true, "PT1H");
supervisor = getSupervisor(2, 1, true, "PT1H", null);
addSomeEvents(1);
Capture<KafkaIndexTask> captured = Capture.newInstance(CaptureType.ALL);
@ -297,13 +298,52 @@ public class KafkaSupervisorTest extends EasyMockSupport
Assert.assertEquals(0L, (long) task2.getIOConfig().getStartPartitions().getPartitionOffsetMap().get(2));
}
@Test
public void testLateMessageRejectionPeriod() throws Exception
{
supervisor = getSupervisor(2, 1, true, "PT1H", new Period("PT1H"));
addSomeEvents(1);
Capture<KafkaIndexTask> captured = Capture.newInstance(CaptureType.ALL);
expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
expect(taskMaster.getTaskRunner()).andReturn(Optional.<TaskRunner>absent()).anyTimes();
expect(taskStorage.getActiveTasks()).andReturn(ImmutableList.<Task>of()).anyTimes();
expect(indexerMetadataStorageCoordinator.getDataSourceMetadata(DATASOURCE)).andReturn(
new KafkaDataSourceMetadata(
null
)
).anyTimes();
expect(taskQueue.add(capture(captured))).andReturn(true).times(2);
replayAll();
supervisor.start();
supervisor.runInternal();
verifyAll();
KafkaIndexTask task1 = captured.getValues().get(0);
KafkaIndexTask task2 = captured.getValues().get(1);
Assert.assertTrue(
"minimumMessageTime",
task1.getIOConfig().getMinimumMessageTime().get().plusMinutes(59).isBeforeNow()
);
Assert.assertTrue(
"minimumMessageTime",
task1.getIOConfig().getMinimumMessageTime().get().plusMinutes(61).isAfterNow()
);
Assert.assertEquals(
task1.getIOConfig().getMinimumMessageTime().get(),
task2.getIOConfig().getMinimumMessageTime().get()
);
}
@Test
/**
* Test generating the starting offsets from the partition high water marks in Kafka.
*/
public void testLatestOffset() throws Exception
{
supervisor = getSupervisor(1, 1, false, "PT1H");
supervisor = getSupervisor(1, 1, false, "PT1H", null);
addSomeEvents(1100);
Capture<KafkaIndexTask> captured = Capture.newInstance();
@ -335,7 +375,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
*/
public void testDatasourceMetadata() throws Exception
{
supervisor = getSupervisor(1, 1, true, "PT1H");
supervisor = getSupervisor(1, 1, true, "PT1H", null);
addSomeEvents(100);
Capture<KafkaIndexTask> captured = Capture.newInstance();
@ -365,7 +405,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
@Test(expected = ISE.class)
public void testBadMetadataOffsets() throws Exception
{
supervisor = getSupervisor(1, 1, true, "PT1H");
supervisor = getSupervisor(1, 1, true, "PT1H", null);
addSomeEvents(1);
expect(taskMaster.getTaskRunner()).andReturn(Optional.<TaskRunner>absent()).anyTimes();
@ -384,7 +424,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
@Test
public void testKillIncompatibleTasks() throws Exception
{
supervisor = getSupervisor(2, 1, true, "PT1H");
supervisor = getSupervisor(2, 1, true, "PT1H", null);
addSomeEvents(1);
Task id1 = createKafkaIndexTask( // unexpected # of partitions (kill)
@ -392,7 +432,8 @@ public class KafkaSupervisorTest extends EasyMockSupport
DATASOURCE,
"index_kafka_testDS__some_other_sequenceName",
new KafkaPartitions("topic", ImmutableMap.of(0, 0L)),
new KafkaPartitions("topic", ImmutableMap.of(0, 10L))
new KafkaPartitions("topic", ImmutableMap.of(0, 10L)),
null
);
Task id2 = createKafkaIndexTask( // correct number of partitions and ranges (don't kill)
@ -400,7 +441,8 @@ public class KafkaSupervisorTest extends EasyMockSupport
DATASOURCE,
"sequenceName-0",
new KafkaPartitions("topic", ImmutableMap.of(0, 0L, 1, 0L, 2, 0L)),
new KafkaPartitions("topic", ImmutableMap.of(0, 333L, 1, 333L, 2, 333L))
new KafkaPartitions("topic", ImmutableMap.of(0, 333L, 1, 333L, 2, 333L)),
null
);
Task id3 = createKafkaIndexTask( // unexpected range on partition 2 (kill)
@ -408,7 +450,8 @@ public class KafkaSupervisorTest extends EasyMockSupport
DATASOURCE,
"index_kafka_testDS__some_other_sequenceName",
new KafkaPartitions("topic", ImmutableMap.of(0, 0L, 1, 0L, 2, 1L)),
new KafkaPartitions("topic", ImmutableMap.of(0, 333L, 1, 333L, 2, 330L))
new KafkaPartitions("topic", ImmutableMap.of(0, 333L, 1, 333L, 2, 330L)),
null
);
Task id4 = createKafkaIndexTask( // different datasource (don't kill)
@ -416,7 +459,8 @@ public class KafkaSupervisorTest extends EasyMockSupport
"other-datasource",
"index_kafka_testDS_d927edff33c4b3f",
new KafkaPartitions("topic", ImmutableMap.of(0, 0L)),
new KafkaPartitions("topic", ImmutableMap.of(0, 10L))
new KafkaPartitions("topic", ImmutableMap.of(0, 10L)),
null
);
Task id5 = new RealtimeIndexTask( // non KafkaIndexTask (don't kill)
@ -465,7 +509,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
@Test
public void testKillBadPartitionAssignment() throws Exception
{
supervisor = getSupervisor(1, 2, true, "PT1H");
supervisor = getSupervisor(1, 2, true, "PT1H", null);
addSomeEvents(1);
Task id1 = createKafkaIndexTask(
@ -473,35 +517,40 @@ public class KafkaSupervisorTest extends EasyMockSupport
DATASOURCE,
"sequenceName-0",
new KafkaPartitions("topic", ImmutableMap.of(0, 0L, 2, 0L)),
new KafkaPartitions("topic", ImmutableMap.of(0, Long.MAX_VALUE, 2, Long.MAX_VALUE))
new KafkaPartitions("topic", ImmutableMap.of(0, Long.MAX_VALUE, 2, Long.MAX_VALUE)),
null
);
Task id2 = createKafkaIndexTask(
"id2",
DATASOURCE,
"sequenceName-1",
new KafkaPartitions("topic", ImmutableMap.of(1, 0L)),
new KafkaPartitions("topic", ImmutableMap.of(1, Long.MAX_VALUE))
new KafkaPartitions("topic", ImmutableMap.of(1, Long.MAX_VALUE)),
null
);
Task id3 = createKafkaIndexTask(
"id3",
DATASOURCE,
"sequenceName-0",
new KafkaPartitions("topic", ImmutableMap.of(0, 0L, 1, 0L, 2, 0L)),
new KafkaPartitions("topic", ImmutableMap.of(0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2, Long.MAX_VALUE))
new KafkaPartitions("topic", ImmutableMap.of(0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2, Long.MAX_VALUE)),
null
);
Task id4 = createKafkaIndexTask(
"id4",
DATASOURCE,
"sequenceName-0",
new KafkaPartitions("topic", ImmutableMap.of(0, 0L, 1, 0L)),
new KafkaPartitions("topic", ImmutableMap.of(0, Long.MAX_VALUE, 1, Long.MAX_VALUE))
new KafkaPartitions("topic", ImmutableMap.of(0, Long.MAX_VALUE, 1, Long.MAX_VALUE)),
null
);
Task id5 = createKafkaIndexTask(
"id5",
DATASOURCE,
"sequenceName-0",
new KafkaPartitions("topic", ImmutableMap.of(1, 0L, 2, 0L)),
new KafkaPartitions("topic", ImmutableMap.of(1, Long.MAX_VALUE, 2, Long.MAX_VALUE))
new KafkaPartitions("topic", ImmutableMap.of(1, Long.MAX_VALUE, 2, Long.MAX_VALUE)),
null
);
List<Task> existingTasks = ImmutableList.of(id1, id2, id3, id4, id5);
@ -542,7 +591,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
@Test
public void testRequeueTaskWhenFailed() throws Exception
{
supervisor = getSupervisor(2, 2, true, "PT1H");
supervisor = getSupervisor(2, 2, true, "PT1H", null);
addSomeEvents(1);
Capture<Task> captured = Capture.newInstance(CaptureType.ALL);
@ -608,10 +657,82 @@ public class KafkaSupervisorTest extends EasyMockSupport
);
}
@Test
public void testRequeueAdoptedTaskWhenFailed() throws Exception
{
supervisor = getSupervisor(2, 1, true, "PT1H", null);
addSomeEvents(1);
DateTime now = DateTime.now();
Task id1 = createKafkaIndexTask(
"id1",
DATASOURCE,
"sequenceName-0",
new KafkaPartitions("topic", ImmutableMap.of(0, 0L, 2, 0L)),
new KafkaPartitions("topic", ImmutableMap.of(0, Long.MAX_VALUE, 2, Long.MAX_VALUE)),
now
);
List<Task> existingTasks = ImmutableList.of(id1);
Capture<Task> captured = Capture.newInstance();
expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes();
expect(taskRunner.getRunningTasks()).andReturn(Collections.EMPTY_LIST).anyTimes();
expect(taskStorage.getActiveTasks()).andReturn(existingTasks).anyTimes();
expect(taskStorage.getStatus("id1")).andReturn(Optional.of(TaskStatus.running("id1"))).anyTimes();
expect(taskStorage.getTask("id1")).andReturn(Optional.of(id1)).anyTimes();
expect(taskClient.getStartTime(anyString(), eq(false))).andThrow(taskClient.new NoTaskLocationException("test-id"))
.anyTimes();
expect(taskQueue.add(capture(captured))).andReturn(true);
expect(indexerMetadataStorageCoordinator.getDataSourceMetadata(DATASOURCE)).andReturn(
new KafkaDataSourceMetadata(
null
)
).anyTimes();
taskRunner.registerListener(anyObject(TaskRunnerListener.class), anyObject(Executor.class));
replayAll();
supervisor.start();
supervisor.runInternal();
verifyAll();
// check that replica tasks are created with the same minimumMessageTime as tasks inherited from another supervisor
Assert.assertEquals(now, ((KafkaIndexTask) captured.getValue()).getIOConfig().getMinimumMessageTime().get());
// test that a task failing causes a new task to be re-queued with the same parameters
String runningTaskId = captured.getValue().getId();
Capture<Task> aNewTaskCapture = Capture.newInstance();
KafkaIndexTask iHaveFailed = (KafkaIndexTask) existingTasks.get(0);
reset(taskStorage);
reset(taskQueue);
expect(taskStorage.getActiveTasks()).andReturn(ImmutableList.of(captured.getValue())).anyTimes();
expect(taskStorage.getStatus(iHaveFailed.getId())).andReturn(Optional.of(TaskStatus.failure(iHaveFailed.getId())));
expect(taskStorage.getStatus(runningTaskId)).andReturn(Optional.of(TaskStatus.running(runningTaskId))).anyTimes();
expect(taskStorage.getTask(iHaveFailed.getId())).andReturn(Optional.of((Task) iHaveFailed)).anyTimes();
expect(taskStorage.getTask(runningTaskId)).andReturn(Optional.of(captured.getValue())).anyTimes();
expect(taskQueue.add(capture(aNewTaskCapture))).andReturn(true);
replay(taskStorage);
replay(taskQueue);
supervisor.runInternal();
verifyAll();
Assert.assertNotEquals(iHaveFailed.getId(), aNewTaskCapture.getValue().getId());
Assert.assertEquals(
iHaveFailed.getIOConfig().getBaseSequenceName(),
((KafkaIndexTask) aNewTaskCapture.getValue()).getIOConfig().getBaseSequenceName()
);
// check that failed tasks are recreated with the same minimumMessageTime as the task it replaced, even if that
// task came from another supervisor
Assert.assertEquals(now, ((KafkaIndexTask) aNewTaskCapture.getValue()).getIOConfig().getMinimumMessageTime().get());
}
@Test
public void testQueueNextTasksOnSuccess() throws Exception
{
supervisor = getSupervisor(2, 2, true, "PT1H");
supervisor = getSupervisor(2, 2, true, "PT1H", null);
addSomeEvents(1);
Capture<Task> captured = Capture.newInstance(CaptureType.ALL);
@ -680,7 +801,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
{
final TaskLocation location = new TaskLocation("testHost", 1234);
supervisor = getSupervisor(2, 2, true, "PT1M");
supervisor = getSupervisor(2, 2, true, "PT1M", null);
addSomeEvents(100);
Capture<Task> captured = Capture.newInstance(CaptureType.ALL);
@ -762,7 +883,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
{
final TaskLocation location = new TaskLocation("testHost", 1234);
supervisor = getSupervisor(1, 1, true, "PT1H");
supervisor = getSupervisor(1, 1, true, "PT1H", null);
addSomeEvents(1);
Task task = createKafkaIndexTask(
@ -770,7 +891,8 @@ public class KafkaSupervisorTest extends EasyMockSupport
DATASOURCE,
"sequenceName-0",
new KafkaPartitions("topic", ImmutableMap.of(0, 0L, 1, 0L, 2, 0L)),
new KafkaPartitions("topic", ImmutableMap.of(0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2, Long.MAX_VALUE))
new KafkaPartitions("topic", ImmutableMap.of(0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2, Long.MAX_VALUE)),
null
);
Collection workItems = new ArrayList<>();
@ -851,7 +973,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
final TaskLocation location2 = new TaskLocation("testHost2", 145);
final DateTime startTime = new DateTime();
supervisor = getSupervisor(1, 1, true, "PT1H");
supervisor = getSupervisor(1, 1, true, "PT1H", null);
addSomeEvents(1);
Task id1 = createKafkaIndexTask(
@ -859,7 +981,8 @@ public class KafkaSupervisorTest extends EasyMockSupport
DATASOURCE,
"sequenceName-0",
new KafkaPartitions("topic", ImmutableMap.of(0, 0L, 1, 0L, 2, 0L)),
new KafkaPartitions("topic", ImmutableMap.of(0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2, Long.MAX_VALUE))
new KafkaPartitions("topic", ImmutableMap.of(0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2, Long.MAX_VALUE)),
null
);
Task id2 = createKafkaIndexTask(
@ -867,7 +990,8 @@ public class KafkaSupervisorTest extends EasyMockSupport
DATASOURCE,
"sequenceName-0",
new KafkaPartitions("topic", ImmutableMap.of(0, 10L, 1, 20L, 2, 30L)),
new KafkaPartitions("topic", ImmutableMap.of(0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2, Long.MAX_VALUE))
new KafkaPartitions("topic", ImmutableMap.of(0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2, Long.MAX_VALUE)),
null
);
Collection workItems = new ArrayList<>();
@ -932,7 +1056,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
@Test(expected = IllegalStateException.class)
public void testStopNotStarted() throws Exception
{
supervisor = getSupervisor(1, 1, true, "PT1H");
supervisor = getSupervisor(1, 1, true, "PT1H", null);
supervisor.stop(false);
}
@ -943,7 +1067,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
taskRunner.unregisterListener(String.format("KafkaSupervisor-%s", DATASOURCE));
replayAll();
supervisor = getSupervisor(1, 1, true, "PT1H");
supervisor = getSupervisor(1, 1, true, "PT1H", null);
supervisor.start();
supervisor.stop(false);
@ -968,7 +1092,13 @@ public class KafkaSupervisorTest extends EasyMockSupport
}
}
private KafkaSupervisor getSupervisor(int replicas, int taskCount, boolean useEarliestOffset, String duration)
private KafkaSupervisor getSupervisor(
int replicas,
int taskCount,
boolean useEarliestOffset,
String duration,
Period lateMessageRejectionPeriod
)
{
KafkaSupervisorIOConfig kafkaSupervisorIOConfig = new KafkaSupervisorIOConfig(
KAFKA_TOPIC,
@ -979,7 +1109,8 @@ public class KafkaSupervisorTest extends EasyMockSupport
new Period("P1D"),
new Period("PT30S"),
useEarliestOffset,
new Period("PT30M")
new Period("PT30M"),
lateMessageRejectionPeriod
);
KafkaIndexTaskClientFactory taskClientFactory = new KafkaIndexTaskClientFactory(null, null)
@ -1049,7 +1180,8 @@ public class KafkaSupervisorTest extends EasyMockSupport
String dataSource,
String sequenceName,
KafkaPartitions startPartitions,
KafkaPartitions endPartitions
KafkaPartitions endPartitions,
DateTime minimumMessageTime
)
{
return new KafkaIndexTask(
@ -1063,7 +1195,8 @@ public class KafkaSupervisorTest extends EasyMockSupport
endPartitions,
ImmutableMap.<String, String>of(),
true,
false
false,
minimumMessageTime
),
ImmutableMap.<String, Object>of(),
null