Message rejection absolute date (#8656)

* Add option lateMessageRejectionStartDate

* Use option lateMessageRejectionStartDate

* Fix tests

* Add lateMessageRejectionStartDate to kafka indexing service

* Update tests kafka indexing service

* Fix tests for KafkaSupervisorTest

* Add lateMessageRejectionStartDate to KinesisSupervisorIOConfig

* Fix var name

* Update documentation

* Add check lateMessageRejectionStartDateTime and lateMessageRejectionPeriod, fails if both were specified.
This commit is contained in:
Giuseppe Martino 2019-10-31 23:13:02 +01:00 committed by Himanshu
parent e70b71c90f
commit 9c171e2b1f
12 changed files with 152 additions and 35 deletions

View File

@ -205,7 +205,8 @@ For Roaring bitmaps:
|`period`|ISO8601 Period|How often the supervisor will execute its management logic. Note that the supervisor will also run in response to certain events (such as tasks succeeding, failing, and reaching their taskDuration) so this value specifies the maximum time between iterations.|no (default == PT30S)|
|`useEarliestOffset`|Boolean|If a supervisor is managing a dataSource for the first time, it will obtain a set of starting offsets from Kafka. This flag determines whether it retrieves the earliest or latest offsets in Kafka. Under normal circumstances, subsequent tasks will start from where the previous segments ended so this flag will only be used on first run.|no (default == false)|
|`completionTimeout`|ISO8601 Period|The length of time to wait before declaring a publishing task as failed and terminating it. If this is set too low, your tasks may never publish. The publishing clock for a task begins roughly after `taskDuration` elapses.|no (default == PT30M)|
|`lateMessageRejectionPeriod`|ISO8601 Period|Configure tasks to reject messages with timestamps earlier than this period before the task was created; for example if this is set to `PT1H` and the supervisor creates a task at *2016-01-01T12:00Z*, messages with timestamps earlier than *2016-01-01T11:00Z* will be dropped. This may help prevent concurrency issues if your data stream has late messages and you have multiple pipelines that need to operate on the same segments (e.g. a realtime and a nightly batch ingestion pipeline).|no (default == none)|
|`lateMessageRejectionStartDateTime`|ISO8601 DateTime|Configure tasks to reject messages with timestamps earlier than this date time; for example if this is set to `2016-01-01T11:00Z` and the supervisor creates a task at *2016-01-01T12:00Z*, messages with timestamps earlier than *2016-01-01T11:00Z* will be dropped. This may help prevent concurrency issues if your data stream has late messages and you have multiple pipelines that need to operate on the same segments (e.g. a realtime and a nightly batch ingestion pipeline).|no (default == none)|
|`lateMessageRejectionPeriod`|ISO8601 Period|Configure tasks to reject messages with timestamps earlier than this period before the task was created; for example if this is set to `PT1H` and the supervisor creates a task at *2016-01-01T12:00Z*, messages with timestamps earlier than *2016-01-01T11:00Z* will be dropped. This may help prevent concurrency issues if your data stream has late messages and you have multiple pipelines that need to operate on the same segments (e.g. a realtime and a nightly batch ingestion pipeline). Please note that only one of `lateMessageRejectionPeriod` or `lateMessageRejectionStartDateTime` can be specified.|no (default == none)|
|`earlyMessageRejectionPeriod`|ISO8601 Period|Configure tasks to reject messages with timestamps later than this period after the task reached its taskDuration; for example if this is set to `PT1H`, the taskDuration is set to `PT1H` and the supervisor creates a task at *2016-01-01T12:00Z*, messages with timestamps later than *2016-01-01T14:00Z* will be dropped. **Note:** Tasks sometimes run past their task duration, for example, in cases of supervisor failover. Setting earlyMessageRejectionPeriod too low may cause messages to be dropped unexpectedly whenever a task runs past its originally configured task duration.|no (default == none)|
## Operations

View File

@ -24,6 +24,7 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorIOConfig;
import org.apache.druid.java.util.common.StringUtils;
import org.joda.time.DateTime;
import org.joda.time.Period;
import java.util.Map;
@ -53,7 +54,8 @@ public class KafkaSupervisorIOConfig extends SeekableStreamSupervisorIOConfig
@JsonProperty("useEarliestOffset") Boolean useEarliestOffset,
@JsonProperty("completionTimeout") Period completionTimeout,
@JsonProperty("lateMessageRejectionPeriod") Period lateMessageRejectionPeriod,
@JsonProperty("earlyMessageRejectionPeriod") Period earlyMessageRejectionPeriod
@JsonProperty("earlyMessageRejectionPeriod") Period earlyMessageRejectionPeriod,
@JsonProperty("lateMessageRejectionStartDateTime") DateTime lateMessageRejectionStartDateTime
)
{
super(
@ -66,7 +68,8 @@ public class KafkaSupervisorIOConfig extends SeekableStreamSupervisorIOConfig
useEarliestOffset,
completionTimeout,
lateMessageRejectionPeriod,
earlyMessageRejectionPeriod
earlyMessageRejectionPeriod,
lateMessageRejectionStartDateTime
);
this.consumerProperties = Preconditions.checkNotNull(consumerProperties, "consumerProperties");
@ -117,6 +120,7 @@ public class KafkaSupervisorIOConfig extends SeekableStreamSupervisorIOConfig
", completionTimeout=" + getCompletionTimeout() +
", earlyMessageRejectionPeriod=" + getEarlyMessageRejectionPeriod() +
", lateMessageRejectionPeriod=" + getLateMessageRejectionPeriod() +
", lateMessageRejectionStartDateTime=" + getLateMessageRejectionStartDateTime() +
'}';
}

View File

@ -149,6 +149,7 @@ public class KafkaSamplerSpecTest
true,
null,
null,
null,
null
),
null,

View File

@ -26,6 +26,7 @@ import com.google.common.collect.ImmutableMap;
import org.apache.druid.indexing.kafka.KafkaIndexTaskModule;
import org.apache.druid.indexing.kafka.KafkaRecordSupplier;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.DateTimes;
import org.hamcrest.CoreMatchers;
import org.joda.time.Duration;
import org.junit.Assert;
@ -78,35 +79,36 @@ public class KafkaSupervisorIOConfigTest
Assert.assertEquals(Duration.standardMinutes(30), config.getCompletionTimeout());
Assert.assertFalse("lateMessageRejectionPeriod", config.getLateMessageRejectionPeriod().isPresent());
Assert.assertFalse("earlyMessageRejectionPeriod", config.getEarlyMessageRejectionPeriod().isPresent());
Assert.assertFalse("lateMessageRejectionStartDateTime", config.getLateMessageRejectionStartDateTime().isPresent());
}
@Test
public void testSerdeWithNonDefaults() throws Exception
public void testSerdeWithNonDefaultsWithLateMessagePeriod() throws Exception
{
String jsonStr = "{\n"
+ " \"type\": \"kafka\",\n"
+ " \"topic\": \"my-topic\",\n"
+ " \"replicas\": 3,\n"
+ " \"taskCount\": 9,\n"
+ " \"taskDuration\": \"PT30M\",\n"
+ " \"consumerProperties\": {\"bootstrap.servers\":\"localhost:9092\"},\n"
+ " \"pollTimeout\": 1000,\n"
+ " \"startDelay\": \"PT1M\",\n"
+ " \"period\": \"PT10S\",\n"
+ " \"useEarliestOffset\": true,\n"
+ " \"completionTimeout\": \"PT45M\",\n"
+ " \"lateMessageRejectionPeriod\": \"PT1H\",\n"
+ " \"earlyMessageRejectionPeriod\": \"PT1H\"\n"
+ "}";
+ " \"type\": \"kafka\",\n"
+ " \"topic\": \"my-topic\",\n"
+ " \"replicas\": 3,\n"
+ " \"taskCount\": 9,\n"
+ " \"taskDuration\": \"PT30M\",\n"
+ " \"consumerProperties\": {\"bootstrap.servers\":\"localhost:9092\"},\n"
+ " \"pollTimeout\": 1000,\n"
+ " \"startDelay\": \"PT1M\",\n"
+ " \"period\": \"PT10S\",\n"
+ " \"useEarliestOffset\": true,\n"
+ " \"completionTimeout\": \"PT45M\",\n"
+ " \"lateMessageRejectionPeriod\": \"PT1H\",\n"
+ " \"earlyMessageRejectionPeriod\": \"PT1H\"\n"
+ "}";
KafkaSupervisorIOConfig config = mapper.readValue(
mapper.writeValueAsString(
mapper.readValue(
jsonStr,
KafkaSupervisorIOConfig.class
)
), KafkaSupervisorIOConfig.class
);
)
), KafkaSupervisorIOConfig.class
);
Assert.assertEquals("my-topic", config.getTopic());
Assert.assertEquals(3, (int) config.getReplicas());
@ -122,6 +124,47 @@ public class KafkaSupervisorIOConfigTest
Assert.assertEquals(Duration.standardHours(1), config.getEarlyMessageRejectionPeriod().get());
}
@Test
public void testSerdeWithNonDefaultsWithLateMessageStartDateTime() throws Exception
{
String jsonStr = "{\n"
+ " \"type\": \"kafka\",\n"
+ " \"topic\": \"my-topic\",\n"
+ " \"replicas\": 3,\n"
+ " \"taskCount\": 9,\n"
+ " \"taskDuration\": \"PT30M\",\n"
+ " \"consumerProperties\": {\"bootstrap.servers\":\"localhost:9092\"},\n"
+ " \"pollTimeout\": 1000,\n"
+ " \"startDelay\": \"PT1M\",\n"
+ " \"period\": \"PT10S\",\n"
+ " \"useEarliestOffset\": true,\n"
+ " \"completionTimeout\": \"PT45M\",\n"
+ " \"earlyMessageRejectionPeriod\": \"PT1H\",\n"
+ " \"lateMessageRejectionStartDateTime\": \"2016-05-31T12:00Z\"\n"
+ "}";
KafkaSupervisorIOConfig config = mapper.readValue(
mapper.writeValueAsString(
mapper.readValue(
jsonStr,
KafkaSupervisorIOConfig.class
)
), KafkaSupervisorIOConfig.class
);
Assert.assertEquals("my-topic", config.getTopic());
Assert.assertEquals(3, (int) config.getReplicas());
Assert.assertEquals(9, (int) config.getTaskCount());
Assert.assertEquals(Duration.standardMinutes(30), config.getTaskDuration());
Assert.assertEquals(ImmutableMap.of("bootstrap.servers", "localhost:9092"), config.getConsumerProperties());
Assert.assertEquals(1000, config.getPollTimeout());
Assert.assertEquals(Duration.standardMinutes(1), config.getStartDelay());
Assert.assertEquals(Duration.standardSeconds(10), config.getPeriod());
Assert.assertEquals(true, config.isUseEarliestOffset());
Assert.assertEquals(Duration.standardMinutes(45), config.getCompletionTimeout());
Assert.assertEquals(DateTimes.of("2016-05-31T12:00Z"), config.getLateMessageRejectionStartDateTime().get());
}
@Test
public void testSerdeForConsumerPropertiesWithPasswords() throws Exception
{
@ -177,14 +220,44 @@ public class KafkaSupervisorIOConfigTest
public void testBootstrapServersRequired() throws Exception
{
String jsonStr = "{\n"
+ " \"type\": \"kafka\",\n"
+ " \"topic\": \"my-topic\",\n"
+ " \"consumerProperties\": {}\n"
+ "}";
+ " \"type\": \"kafka\",\n"
+ " \"topic\": \"my-topic\",\n"
+ " \"consumerProperties\": {}\n"
+ "}";
exception.expect(JsonMappingException.class);
exception.expectCause(CoreMatchers.isA(NullPointerException.class));
exception.expectMessage(CoreMatchers.containsString("bootstrap.servers"));
mapper.readValue(jsonStr, KafkaSupervisorIOConfig.class);
}
@Test
public void testSerdeWithBothExclusiveProperties() throws Exception
{
String jsonStr = "{\n"
+ " \"type\": \"kafka\",\n"
+ " \"topic\": \"my-topic\",\n"
+ " \"replicas\": 3,\n"
+ " \"taskCount\": 9,\n"
+ " \"taskDuration\": \"PT30M\",\n"
+ " \"consumerProperties\": {\"bootstrap.servers\":\"localhost:9092\"},\n"
+ " \"pollTimeout\": 1000,\n"
+ " \"startDelay\": \"PT1M\",\n"
+ " \"period\": \"PT10S\",\n"
+ " \"useEarliestOffset\": true,\n"
+ " \"completionTimeout\": \"PT45M\",\n"
+ " \"lateMessageRejectionPeriod\": \"PT1H\",\n"
+ " \"earlyMessageRejectionPeriod\": \"PT1H\",\n"
+ " \"lateMessageRejectionStartDateTime\": \"2016-05-31T12:00Z\"\n"
+ "}";
exception.expect(JsonMappingException.class);
KafkaSupervisorIOConfig config = mapper.readValue(
mapper.writeValueAsString(
mapper.readValue(
jsonStr,
KafkaSupervisorIOConfig.class
)
), KafkaSupervisorIOConfig.class
);
}
}

View File

@ -3229,7 +3229,8 @@ public class KafkaSupervisorTest extends EasyMockSupport
useEarliestOffset,
new Period("PT30M"),
lateMessageRejectionPeriod,
earlyMessageRejectionPeriod
earlyMessageRejectionPeriod,
null
);
KafkaIndexTaskClientFactory taskClientFactory = new KafkaIndexTaskClientFactory(
@ -3336,7 +3337,8 @@ public class KafkaSupervisorTest extends EasyMockSupport
useEarliestOffset,
new Period("PT30M"),
lateMessageRejectionPeriod,
earlyMessageRejectionPeriod
earlyMessageRejectionPeriod,
null
);
KafkaIndexTaskClientFactory taskClientFactory = new KafkaIndexTaskClientFactory(
@ -3447,7 +3449,8 @@ public class KafkaSupervisorTest extends EasyMockSupport
useEarliestOffset,
new Period("PT30M"),
lateMessageRejectionPeriod,
earlyMessageRejectionPeriod
earlyMessageRejectionPeriod,
null
);
KafkaIndexTaskClientFactory taskClientFactory = new KafkaIndexTaskClientFactory(

View File

@ -25,6 +25,7 @@ import com.google.common.base.Preconditions;
import org.apache.druid.indexing.kinesis.KinesisIndexTaskIOConfig;
import org.apache.druid.indexing.kinesis.KinesisRegion;
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorIOConfig;
import org.joda.time.DateTime;
import org.joda.time.Period;
public class KinesisSupervisorIOConfig extends SeekableStreamSupervisorIOConfig
@ -62,6 +63,7 @@ public class KinesisSupervisorIOConfig extends SeekableStreamSupervisorIOConfig
@JsonProperty("completionTimeout") Period completionTimeout,
@JsonProperty("lateMessageRejectionPeriod") Period lateMessageRejectionPeriod,
@JsonProperty("earlyMessageRejectionPeriod") Period earlyMessageRejectionPeriod,
@JsonProperty("lateMessageRejectionStartDateTime") DateTime lateMessageRejectionStartDateTime,
@JsonProperty("recordsPerFetch") Integer recordsPerFetch,
@JsonProperty("fetchDelayMillis") Integer fetchDelayMillis,
@JsonProperty("awsAssumedRoleArn") String awsAssumedRoleArn,
@ -79,7 +81,8 @@ public class KinesisSupervisorIOConfig extends SeekableStreamSupervisorIOConfig
useEarliestSequenceNumber,
completionTimeout,
lateMessageRejectionPeriod,
earlyMessageRejectionPeriod
earlyMessageRejectionPeriod,
lateMessageRejectionStartDateTime
);
this.endpoint = endpoint != null
? endpoint
@ -146,6 +149,7 @@ public class KinesisSupervisorIOConfig extends SeekableStreamSupervisorIOConfig
", completionTimeout=" + getCompletionTimeout() +
", lateMessageRejectionPeriod=" + getLateMessageRejectionPeriod() +
", earlyMessageRejectionPeriod=" + getEarlyMessageRejectionPeriod() +
", lateMessageRejectionStartDateTime=" + getLateMessageRejectionStartDateTime() +
", recordsPerFetch=" + recordsPerFetch +
", fetchDelayMillis=" + fetchDelayMillis +
", awsAssumedRoleArn='" + awsAssumedRoleArn + '\'' +

View File

@ -164,6 +164,7 @@ public class KinesisSamplerSpecTest extends EasyMockSupport
null,
null,
null,
null,
false
),
null,

View File

@ -69,6 +69,7 @@ public class KinesisSupervisorIOConfigTest
Assert.assertEquals(Duration.standardMinutes(30), config.getCompletionTimeout());
Assert.assertFalse("lateMessageRejectionPeriod", config.getLateMessageRejectionPeriod().isPresent());
Assert.assertFalse("earlyMessageRejectionPeriod", config.getEarlyMessageRejectionPeriod().isPresent());
Assert.assertFalse("lateMessageRejectionStartDateTime", config.getLateMessageRejectionStartDateTime().isPresent());
Assert.assertEquals((Integer) 4000, config.getRecordsPerFetch());
Assert.assertEquals((Integer) 0, config.getFetchDelayMillis());
Assert.assertNull(config.getAwsAssumedRoleArn());

View File

@ -4611,6 +4611,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
null,
null,
null,
null,
false
);
@ -4743,6 +4744,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
new Period("PT30M"),
lateMessageRejectionPeriod,
earlyMessageRejectionPeriod,
null,
recordsPerFetch,
fetchDelayMillis,
null,
@ -4827,6 +4829,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
new Period("PT30M"),
lateMessageRejectionPeriod,
earlyMessageRejectionPeriod,
null,
recordsPerFetch,
fetchDelayMillis,
null,
@ -4913,6 +4916,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
new Period("PT30M"),
lateMessageRejectionPeriod,
earlyMessageRejectionPeriod,
null,
recordsPerFetch,
fetchDelayMillis,
null,

View File

@ -2672,10 +2672,14 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
for (Integer groupId : partitionGroups.keySet()) {
if (!activelyReadingTaskGroups.containsKey(groupId)) {
log.info("Creating new task group [%d] for partitions %s", groupId, partitionGroups.get(groupId).keySet());
Optional<DateTime> minimumMessageTime = (ioConfig.getLateMessageRejectionPeriod().isPresent() ? Optional.of(
DateTimes.nowUtc().minus(ioConfig.getLateMessageRejectionPeriod().get())
) : Optional.absent());
Optional<DateTime> minimumMessageTime;
if (ioConfig.getLateMessageRejectionStartDateTime().isPresent()) {
minimumMessageTime = Optional.of(ioConfig.getLateMessageRejectionStartDateTime().get());
} else {
minimumMessageTime = (ioConfig.getLateMessageRejectionPeriod().isPresent() ? Optional.of(
DateTimes.nowUtc().minus(ioConfig.getLateMessageRejectionPeriod().get())
) : Optional.absent());
}
Optional<DateTime> maximumMessageTime = (ioConfig.getEarlyMessageRejectionPeriod().isPresent() ? Optional.of(
DateTimes.nowUtc().plus(ioConfig.getTaskDuration()).plus(ioConfig.getEarlyMessageRejectionPeriod().get())

View File

@ -22,9 +22,12 @@ package org.apache.druid.indexing.seekablestream.supervisor;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import org.apache.druid.java.util.common.IAE;
import org.joda.time.DateTime;
import org.joda.time.Duration;
import org.joda.time.Period;
public abstract class SeekableStreamSupervisorIOConfig
{
private final String stream;
@ -37,6 +40,7 @@ public abstract class SeekableStreamSupervisorIOConfig
private final Duration completionTimeout;
private final Optional<Duration> lateMessageRejectionPeriod;
private final Optional<Duration> earlyMessageRejectionPeriod;
private final Optional<DateTime> lateMessageRejectionStartDateTime;
public SeekableStreamSupervisorIOConfig(
String stream,
@ -48,7 +52,8 @@ public abstract class SeekableStreamSupervisorIOConfig
Boolean useEarliestSequenceNumber,
Period completionTimeout,
Period lateMessageRejectionPeriod,
Period earlyMessageRejectionPeriod
Period earlyMessageRejectionPeriod,
DateTime lateMessageRejectionStartDateTime
)
{
this.stream = Preconditions.checkNotNull(stream, "stream cannot be null");
@ -62,9 +67,19 @@ public abstract class SeekableStreamSupervisorIOConfig
this.lateMessageRejectionPeriod = lateMessageRejectionPeriod == null
? Optional.absent()
: Optional.of(lateMessageRejectionPeriod.toStandardDuration());
this.lateMessageRejectionStartDateTime = lateMessageRejectionStartDateTime == null
? Optional.absent()
: Optional.of(lateMessageRejectionStartDateTime);
this.earlyMessageRejectionPeriod = earlyMessageRejectionPeriod == null
? Optional.absent()
: Optional.of(earlyMessageRejectionPeriod.toStandardDuration());
if (this.lateMessageRejectionPeriod.isPresent()
&& this.lateMessageRejectionStartDateTime.isPresent()) {
throw new IAE("SeekableStreamSupervisorIOConfig does not support "
+ "both properties lateMessageRejectionStartDateTime "
+ "and lateMessageRejectionPeriod.");
}
}
private static Duration defaultDuration(final Period period, final String theDefault)
@ -131,4 +146,10 @@ public abstract class SeekableStreamSupervisorIOConfig
{
return lateMessageRejectionPeriod;
}
@JsonProperty
public Optional<DateTime> getLateMessageRejectionStartDateTime()
{
return lateMessageRejectionStartDateTime;
}
}

View File

@ -600,7 +600,7 @@ public class SeekableStreamSupervisorStateTest extends EasyMockSupport
false,
new Period("PT30M"),
null,
null
null, null
)
{
};