From 9c171e2b1fe6de81c8e06e5726acb1728266830d Mon Sep 17 00:00:00 2001 From: Giuseppe Martino Date: Thu, 31 Oct 2019 23:13:02 +0100 Subject: [PATCH] Message rejection absolute date (#8656) * Add option lateMessageRejectionStartDate * Use option lateMessageRejectionStartDate * Fix tests * Add lateMessageRejectionStartDate to kafka indexing service * Update tests kafka indexing service * Fix tests for KafkaSupervisorTest * Add lateMessageRejectionStartDate to KinesisSupervisorIOConfig * Fix var name * Update documentation * Add check lateMessageRejectionStartDateTime and lateMessageRejectionPeriod, fails if both were specified. --- .../extensions-core/kafka-ingestion.md | 3 +- .../supervisor/KafkaSupervisorIOConfig.java | 8 +- .../indexing/kafka/KafkaSamplerSpecTest.java | 1 + .../KafkaSupervisorIOConfigTest.java | 117 ++++++++++++++---- .../kafka/supervisor/KafkaSupervisorTest.java | 9 +- .../supervisor/KinesisSupervisorIOConfig.java | 6 +- .../kinesis/KinesisSamplerSpecTest.java | 1 + .../KinesisSupervisorIOConfigTest.java | 1 + .../supervisor/KinesisSupervisorTest.java | 4 + .../supervisor/SeekableStreamSupervisor.java | 12 +- .../SeekableStreamSupervisorIOConfig.java | 23 +++- .../SeekableStreamSupervisorStateTest.java | 2 +- 12 files changed, 152 insertions(+), 35 deletions(-) diff --git a/docs/development/extensions-core/kafka-ingestion.md b/docs/development/extensions-core/kafka-ingestion.md index 482d7389c7b..fe85504828c 100644 --- a/docs/development/extensions-core/kafka-ingestion.md +++ b/docs/development/extensions-core/kafka-ingestion.md @@ -205,7 +205,8 @@ For Roaring bitmaps: |`period`|ISO8601 Period|How often the supervisor will execute its management logic. Note that the supervisor will also run in response to certain events (such as tasks succeeding, failing, and reaching their taskDuration) so this value specifies the maximum time between iterations.|no (default == PT30S)| |`useEarliestOffset`|Boolean|If a supervisor is managing a dataSource for the first time, it will obtain a set of starting offsets from Kafka. This flag determines whether it retrieves the earliest or latest offsets in Kafka. Under normal circumstances, subsequent tasks will start from where the previous segments ended so this flag will only be used on first run.|no (default == false)| |`completionTimeout`|ISO8601 Period|The length of time to wait before declaring a publishing task as failed and terminating it. If this is set too low, your tasks may never publish. The publishing clock for a task begins roughly after `taskDuration` elapses.|no (default == PT30M)| -|`lateMessageRejectionPeriod`|ISO8601 Period|Configure tasks to reject messages with timestamps earlier than this period before the task was created; for example if this is set to `PT1H` and the supervisor creates a task at *2016-01-01T12:00Z*, messages with timestamps earlier than *2016-01-01T11:00Z* will be dropped. This may help prevent concurrency issues if your data stream has late messages and you have multiple pipelines that need to operate on the same segments (e.g. a realtime and a nightly batch ingestion pipeline).|no (default == none)| +|`lateMessageRejectionStartDateTime`|ISO8601 DateTime|Configure tasks to reject messages with timestamps earlier than this date time; for example if this is set to `2016-01-01T11:00Z` and the supervisor creates a task at *2016-01-01T12:00Z*, messages with timestamps earlier than *2016-01-01T11:00Z* will be dropped. This may help prevent concurrency issues if your data stream has late messages and you have multiple pipelines that need to operate on the same segments (e.g. a realtime and a nightly batch ingestion pipeline).|no (default == none)| +|`lateMessageRejectionPeriod`|ISO8601 Period|Configure tasks to reject messages with timestamps earlier than this period before the task was created; for example if this is set to `PT1H` and the supervisor creates a task at *2016-01-01T12:00Z*, messages with timestamps earlier than *2016-01-01T11:00Z* will be dropped. This may help prevent concurrency issues if your data stream has late messages and you have multiple pipelines that need to operate on the same segments (e.g. a realtime and a nightly batch ingestion pipeline). Please note that only one of `lateMessageRejectionPeriod` or `lateMessageRejectionStartDateTime` can be specified.|no (default == none)| |`earlyMessageRejectionPeriod`|ISO8601 Period|Configure tasks to reject messages with timestamps later than this period after the task reached its taskDuration; for example if this is set to `PT1H`, the taskDuration is set to `PT1H` and the supervisor creates a task at *2016-01-01T12:00Z*, messages with timestamps later than *2016-01-01T14:00Z* will be dropped. **Note:** Tasks sometimes run past their task duration, for example, in cases of supervisor failover. Setting earlyMessageRejectionPeriod too low may cause messages to be dropped unexpectedly whenever a task runs past its originally configured task duration.|no (default == none)| ## Operations diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfig.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfig.java index 629daa780ed..af377a7f39c 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfig.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfig.java @@ -24,6 +24,7 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Preconditions; import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorIOConfig; import org.apache.druid.java.util.common.StringUtils; +import org.joda.time.DateTime; import org.joda.time.Period; import java.util.Map; @@ -53,7 +54,8 @@ public class KafkaSupervisorIOConfig extends SeekableStreamSupervisorIOConfig @JsonProperty("useEarliestOffset") Boolean useEarliestOffset, @JsonProperty("completionTimeout") Period completionTimeout, @JsonProperty("lateMessageRejectionPeriod") Period lateMessageRejectionPeriod, - @JsonProperty("earlyMessageRejectionPeriod") Period earlyMessageRejectionPeriod + @JsonProperty("earlyMessageRejectionPeriod") Period earlyMessageRejectionPeriod, + @JsonProperty("lateMessageRejectionStartDateTime") DateTime lateMessageRejectionStartDateTime ) { super( @@ -66,7 +68,8 @@ public class KafkaSupervisorIOConfig extends SeekableStreamSupervisorIOConfig useEarliestOffset, completionTimeout, lateMessageRejectionPeriod, - earlyMessageRejectionPeriod + earlyMessageRejectionPeriod, + lateMessageRejectionStartDateTime ); this.consumerProperties = Preconditions.checkNotNull(consumerProperties, "consumerProperties"); @@ -117,6 +120,7 @@ public class KafkaSupervisorIOConfig extends SeekableStreamSupervisorIOConfig ", completionTimeout=" + getCompletionTimeout() + ", earlyMessageRejectionPeriod=" + getEarlyMessageRejectionPeriod() + ", lateMessageRejectionPeriod=" + getLateMessageRejectionPeriod() + + ", lateMessageRejectionStartDateTime=" + getLateMessageRejectionStartDateTime() + '}'; } diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaSamplerSpecTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaSamplerSpecTest.java index 26c36c2b9be..5ede763b8d6 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaSamplerSpecTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaSamplerSpecTest.java @@ -149,6 +149,7 @@ public class KafkaSamplerSpecTest true, null, null, + null, null ), null, diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfigTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfigTest.java index 63b27866fa8..50866890b0f 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfigTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfigTest.java @@ -26,6 +26,7 @@ import com.google.common.collect.ImmutableMap; import org.apache.druid.indexing.kafka.KafkaIndexTaskModule; import org.apache.druid.indexing.kafka.KafkaRecordSupplier; import org.apache.druid.jackson.DefaultObjectMapper; +import org.apache.druid.java.util.common.DateTimes; import org.hamcrest.CoreMatchers; import org.joda.time.Duration; import org.junit.Assert; @@ -78,35 +79,36 @@ public class KafkaSupervisorIOConfigTest Assert.assertEquals(Duration.standardMinutes(30), config.getCompletionTimeout()); Assert.assertFalse("lateMessageRejectionPeriod", config.getLateMessageRejectionPeriod().isPresent()); Assert.assertFalse("earlyMessageRejectionPeriod", config.getEarlyMessageRejectionPeriod().isPresent()); + Assert.assertFalse("lateMessageRejectionStartDateTime", config.getLateMessageRejectionStartDateTime().isPresent()); } @Test - public void testSerdeWithNonDefaults() throws Exception + public void testSerdeWithNonDefaultsWithLateMessagePeriod() throws Exception { String jsonStr = "{\n" - + " \"type\": \"kafka\",\n" - + " \"topic\": \"my-topic\",\n" - + " \"replicas\": 3,\n" - + " \"taskCount\": 9,\n" - + " \"taskDuration\": \"PT30M\",\n" - + " \"consumerProperties\": {\"bootstrap.servers\":\"localhost:9092\"},\n" - + " \"pollTimeout\": 1000,\n" - + " \"startDelay\": \"PT1M\",\n" - + " \"period\": \"PT10S\",\n" - + " \"useEarliestOffset\": true,\n" - + " \"completionTimeout\": \"PT45M\",\n" - + " \"lateMessageRejectionPeriod\": \"PT1H\",\n" - + " \"earlyMessageRejectionPeriod\": \"PT1H\"\n" - + "}"; + + " \"type\": \"kafka\",\n" + + " \"topic\": \"my-topic\",\n" + + " \"replicas\": 3,\n" + + " \"taskCount\": 9,\n" + + " \"taskDuration\": \"PT30M\",\n" + + " \"consumerProperties\": {\"bootstrap.servers\":\"localhost:9092\"},\n" + + " \"pollTimeout\": 1000,\n" + + " \"startDelay\": \"PT1M\",\n" + + " \"period\": \"PT10S\",\n" + + " \"useEarliestOffset\": true,\n" + + " \"completionTimeout\": \"PT45M\",\n" + + " \"lateMessageRejectionPeriod\": \"PT1H\",\n" + + " \"earlyMessageRejectionPeriod\": \"PT1H\"\n" + + "}"; KafkaSupervisorIOConfig config = mapper.readValue( mapper.writeValueAsString( mapper.readValue( jsonStr, KafkaSupervisorIOConfig.class - ) - ), KafkaSupervisorIOConfig.class - ); + ) + ), KafkaSupervisorIOConfig.class + ); Assert.assertEquals("my-topic", config.getTopic()); Assert.assertEquals(3, (int) config.getReplicas()); @@ -122,6 +124,47 @@ public class KafkaSupervisorIOConfigTest Assert.assertEquals(Duration.standardHours(1), config.getEarlyMessageRejectionPeriod().get()); } + @Test + public void testSerdeWithNonDefaultsWithLateMessageStartDateTime() throws Exception + { + String jsonStr = "{\n" + + " \"type\": \"kafka\",\n" + + " \"topic\": \"my-topic\",\n" + + " \"replicas\": 3,\n" + + " \"taskCount\": 9,\n" + + " \"taskDuration\": \"PT30M\",\n" + + " \"consumerProperties\": {\"bootstrap.servers\":\"localhost:9092\"},\n" + + " \"pollTimeout\": 1000,\n" + + " \"startDelay\": \"PT1M\",\n" + + " \"period\": \"PT10S\",\n" + + " \"useEarliestOffset\": true,\n" + + " \"completionTimeout\": \"PT45M\",\n" + + " \"earlyMessageRejectionPeriod\": \"PT1H\",\n" + + " \"lateMessageRejectionStartDateTime\": \"2016-05-31T12:00Z\"\n" + + "}"; + + KafkaSupervisorIOConfig config = mapper.readValue( + mapper.writeValueAsString( + mapper.readValue( + jsonStr, + KafkaSupervisorIOConfig.class + ) + ), KafkaSupervisorIOConfig.class + ); + + Assert.assertEquals("my-topic", config.getTopic()); + Assert.assertEquals(3, (int) config.getReplicas()); + Assert.assertEquals(9, (int) config.getTaskCount()); + Assert.assertEquals(Duration.standardMinutes(30), config.getTaskDuration()); + Assert.assertEquals(ImmutableMap.of("bootstrap.servers", "localhost:9092"), config.getConsumerProperties()); + Assert.assertEquals(1000, config.getPollTimeout()); + Assert.assertEquals(Duration.standardMinutes(1), config.getStartDelay()); + Assert.assertEquals(Duration.standardSeconds(10), config.getPeriod()); + Assert.assertEquals(true, config.isUseEarliestOffset()); + Assert.assertEquals(Duration.standardMinutes(45), config.getCompletionTimeout()); + Assert.assertEquals(DateTimes.of("2016-05-31T12:00Z"), config.getLateMessageRejectionStartDateTime().get()); + } + @Test public void testSerdeForConsumerPropertiesWithPasswords() throws Exception { @@ -177,14 +220,44 @@ public class KafkaSupervisorIOConfigTest public void testBootstrapServersRequired() throws Exception { String jsonStr = "{\n" - + " \"type\": \"kafka\",\n" - + " \"topic\": \"my-topic\",\n" - + " \"consumerProperties\": {}\n" - + "}"; + + " \"type\": \"kafka\",\n" + + " \"topic\": \"my-topic\",\n" + + " \"consumerProperties\": {}\n" + + "}"; exception.expect(JsonMappingException.class); exception.expectCause(CoreMatchers.isA(NullPointerException.class)); exception.expectMessage(CoreMatchers.containsString("bootstrap.servers")); mapper.readValue(jsonStr, KafkaSupervisorIOConfig.class); } + + @Test + public void testSerdeWithBothExclusiveProperties() throws Exception + { + String jsonStr = "{\n" + + " \"type\": \"kafka\",\n" + + " \"topic\": \"my-topic\",\n" + + " \"replicas\": 3,\n" + + " \"taskCount\": 9,\n" + + " \"taskDuration\": \"PT30M\",\n" + + " \"consumerProperties\": {\"bootstrap.servers\":\"localhost:9092\"},\n" + + " \"pollTimeout\": 1000,\n" + + " \"startDelay\": \"PT1M\",\n" + + " \"period\": \"PT10S\",\n" + + " \"useEarliestOffset\": true,\n" + + " \"completionTimeout\": \"PT45M\",\n" + + " \"lateMessageRejectionPeriod\": \"PT1H\",\n" + + " \"earlyMessageRejectionPeriod\": \"PT1H\",\n" + + " \"lateMessageRejectionStartDateTime\": \"2016-05-31T12:00Z\"\n" + + "}"; + exception.expect(JsonMappingException.class); + KafkaSupervisorIOConfig config = mapper.readValue( + mapper.writeValueAsString( + mapper.readValue( + jsonStr, + KafkaSupervisorIOConfig.class + ) + ), KafkaSupervisorIOConfig.class + ); + } } diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java index 92b4e5e5ecd..89ec2b6541c 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java @@ -3229,7 +3229,8 @@ public class KafkaSupervisorTest extends EasyMockSupport useEarliestOffset, new Period("PT30M"), lateMessageRejectionPeriod, - earlyMessageRejectionPeriod + earlyMessageRejectionPeriod, + null ); KafkaIndexTaskClientFactory taskClientFactory = new KafkaIndexTaskClientFactory( @@ -3336,7 +3337,8 @@ public class KafkaSupervisorTest extends EasyMockSupport useEarliestOffset, new Period("PT30M"), lateMessageRejectionPeriod, - earlyMessageRejectionPeriod + earlyMessageRejectionPeriod, + null ); KafkaIndexTaskClientFactory taskClientFactory = new KafkaIndexTaskClientFactory( @@ -3447,7 +3449,8 @@ public class KafkaSupervisorTest extends EasyMockSupport useEarliestOffset, new Period("PT30M"), lateMessageRejectionPeriod, - earlyMessageRejectionPeriod + earlyMessageRejectionPeriod, + null ); KafkaIndexTaskClientFactory taskClientFactory = new KafkaIndexTaskClientFactory( diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorIOConfig.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorIOConfig.java index 2fb43d31517..c69910cde8f 100644 --- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorIOConfig.java +++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorIOConfig.java @@ -25,6 +25,7 @@ import com.google.common.base.Preconditions; import org.apache.druid.indexing.kinesis.KinesisIndexTaskIOConfig; import org.apache.druid.indexing.kinesis.KinesisRegion; import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorIOConfig; +import org.joda.time.DateTime; import org.joda.time.Period; public class KinesisSupervisorIOConfig extends SeekableStreamSupervisorIOConfig @@ -62,6 +63,7 @@ public class KinesisSupervisorIOConfig extends SeekableStreamSupervisorIOConfig @JsonProperty("completionTimeout") Period completionTimeout, @JsonProperty("lateMessageRejectionPeriod") Period lateMessageRejectionPeriod, @JsonProperty("earlyMessageRejectionPeriod") Period earlyMessageRejectionPeriod, + @JsonProperty("lateMessageRejectionStartDateTime") DateTime lateMessageRejectionStartDateTime, @JsonProperty("recordsPerFetch") Integer recordsPerFetch, @JsonProperty("fetchDelayMillis") Integer fetchDelayMillis, @JsonProperty("awsAssumedRoleArn") String awsAssumedRoleArn, @@ -79,7 +81,8 @@ public class KinesisSupervisorIOConfig extends SeekableStreamSupervisorIOConfig useEarliestSequenceNumber, completionTimeout, lateMessageRejectionPeriod, - earlyMessageRejectionPeriod + earlyMessageRejectionPeriod, + lateMessageRejectionStartDateTime ); this.endpoint = endpoint != null ? endpoint @@ -146,6 +149,7 @@ public class KinesisSupervisorIOConfig extends SeekableStreamSupervisorIOConfig ", completionTimeout=" + getCompletionTimeout() + ", lateMessageRejectionPeriod=" + getLateMessageRejectionPeriod() + ", earlyMessageRejectionPeriod=" + getEarlyMessageRejectionPeriod() + + ", lateMessageRejectionStartDateTime=" + getLateMessageRejectionStartDateTime() + ", recordsPerFetch=" + recordsPerFetch + ", fetchDelayMillis=" + fetchDelayMillis + ", awsAssumedRoleArn='" + awsAssumedRoleArn + '\'' + diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisSamplerSpecTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisSamplerSpecTest.java index 33ebe132d49..3cfcd97208e 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisSamplerSpecTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisSamplerSpecTest.java @@ -164,6 +164,7 @@ public class KinesisSamplerSpecTest extends EasyMockSupport null, null, null, + null, false ), null, diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorIOConfigTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorIOConfigTest.java index 037c92bb051..10f866e1262 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorIOConfigTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorIOConfigTest.java @@ -69,6 +69,7 @@ public class KinesisSupervisorIOConfigTest Assert.assertEquals(Duration.standardMinutes(30), config.getCompletionTimeout()); Assert.assertFalse("lateMessageRejectionPeriod", config.getLateMessageRejectionPeriod().isPresent()); Assert.assertFalse("earlyMessageRejectionPeriod", config.getEarlyMessageRejectionPeriod().isPresent()); + Assert.assertFalse("lateMessageRejectionStartDateTime", config.getLateMessageRejectionStartDateTime().isPresent()); Assert.assertEquals((Integer) 4000, config.getRecordsPerFetch()); Assert.assertEquals((Integer) 0, config.getFetchDelayMillis()); Assert.assertNull(config.getAwsAssumedRoleArn()); diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java index 3c8037b160f..b9a1ed6e01a 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java @@ -4611,6 +4611,7 @@ public class KinesisSupervisorTest extends EasyMockSupport null, null, null, + null, false ); @@ -4743,6 +4744,7 @@ public class KinesisSupervisorTest extends EasyMockSupport new Period("PT30M"), lateMessageRejectionPeriod, earlyMessageRejectionPeriod, + null, recordsPerFetch, fetchDelayMillis, null, @@ -4827,6 +4829,7 @@ public class KinesisSupervisorTest extends EasyMockSupport new Period("PT30M"), lateMessageRejectionPeriod, earlyMessageRejectionPeriod, + null, recordsPerFetch, fetchDelayMillis, null, @@ -4913,6 +4916,7 @@ public class KinesisSupervisorTest extends EasyMockSupport new Period("PT30M"), lateMessageRejectionPeriod, earlyMessageRejectionPeriod, + null, recordsPerFetch, fetchDelayMillis, null, diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java index 062acb586a3..b6d9ccc0980 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java @@ -2672,10 +2672,14 @@ public abstract class SeekableStreamSupervisor minimumMessageTime = (ioConfig.getLateMessageRejectionPeriod().isPresent() ? Optional.of( - DateTimes.nowUtc().minus(ioConfig.getLateMessageRejectionPeriod().get()) - ) : Optional.absent()); + Optional minimumMessageTime; + if (ioConfig.getLateMessageRejectionStartDateTime().isPresent()) { + minimumMessageTime = Optional.of(ioConfig.getLateMessageRejectionStartDateTime().get()); + } else { + minimumMessageTime = (ioConfig.getLateMessageRejectionPeriod().isPresent() ? Optional.of( + DateTimes.nowUtc().minus(ioConfig.getLateMessageRejectionPeriod().get()) + ) : Optional.absent()); + } Optional maximumMessageTime = (ioConfig.getEarlyMessageRejectionPeriod().isPresent() ? Optional.of( DateTimes.nowUtc().plus(ioConfig.getTaskDuration()).plus(ioConfig.getEarlyMessageRejectionPeriod().get()) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorIOConfig.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorIOConfig.java index e62f675d7fc..453a730a956 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorIOConfig.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorIOConfig.java @@ -22,9 +22,12 @@ package org.apache.druid.indexing.seekablestream.supervisor; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Optional; import com.google.common.base.Preconditions; +import org.apache.druid.java.util.common.IAE; +import org.joda.time.DateTime; import org.joda.time.Duration; import org.joda.time.Period; + public abstract class SeekableStreamSupervisorIOConfig { private final String stream; @@ -37,6 +40,7 @@ public abstract class SeekableStreamSupervisorIOConfig private final Duration completionTimeout; private final Optional lateMessageRejectionPeriod; private final Optional earlyMessageRejectionPeriod; + private final Optional lateMessageRejectionStartDateTime; public SeekableStreamSupervisorIOConfig( String stream, @@ -48,7 +52,8 @@ public abstract class SeekableStreamSupervisorIOConfig Boolean useEarliestSequenceNumber, Period completionTimeout, Period lateMessageRejectionPeriod, - Period earlyMessageRejectionPeriod + Period earlyMessageRejectionPeriod, + DateTime lateMessageRejectionStartDateTime ) { this.stream = Preconditions.checkNotNull(stream, "stream cannot be null"); @@ -62,9 +67,19 @@ public abstract class SeekableStreamSupervisorIOConfig this.lateMessageRejectionPeriod = lateMessageRejectionPeriod == null ? Optional.absent() : Optional.of(lateMessageRejectionPeriod.toStandardDuration()); + this.lateMessageRejectionStartDateTime = lateMessageRejectionStartDateTime == null + ? Optional.absent() + : Optional.of(lateMessageRejectionStartDateTime); this.earlyMessageRejectionPeriod = earlyMessageRejectionPeriod == null ? Optional.absent() : Optional.of(earlyMessageRejectionPeriod.toStandardDuration()); + + if (this.lateMessageRejectionPeriod.isPresent() + && this.lateMessageRejectionStartDateTime.isPresent()) { + throw new IAE("SeekableStreamSupervisorIOConfig does not support " + + "both properties lateMessageRejectionStartDateTime " + + "and lateMessageRejectionPeriod."); + } } private static Duration defaultDuration(final Period period, final String theDefault) @@ -131,4 +146,10 @@ public abstract class SeekableStreamSupervisorIOConfig { return lateMessageRejectionPeriod; } + + @JsonProperty + public Optional getLateMessageRejectionStartDateTime() + { + return lateMessageRejectionStartDateTime; + } } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java index ef31cb395cc..9d22d37178c 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java @@ -600,7 +600,7 @@ public class SeekableStreamSupervisorStateTest extends EasyMockSupport false, new Period("PT30M"), null, - null + null, null ) { };