From 6f436301beef010a06796c0c2223b37503f66896 Mon Sep 17 00:00:00 2001 From: Adithya Chakilam <35785271+adithyachakilam@users.noreply.github.com> Date: Mon, 18 Nov 2024 15:12:24 -0600 Subject: [PATCH] supervisor: make rejection periods work with stopTasksCount (#17442) * kafka-indexing: Report consumer io time * commit * backward * tests * remove unwanted changes * comments * comments * coverage * change name * fixes * fixes * comments --- docs/ingestion/supervisor.md | 2 +- .../RabbitStreamIndexTaskIOConfig.java | 8 +- .../supervisor/RabbitStreamSupervisor.java | 5 +- .../RabbitStreamSupervisorTest.java | 47 ++++ .../kafka/KafkaIndexTaskIOConfig.java | 12 +- .../kafka/supervisor/KafkaSupervisor.java | 3 +- .../indexing/kafka/KafkaIndexTaskTest.java | 160 ++++++++---- .../kafka/supervisor/KafkaSupervisorTest.java | 7 +- .../kinesis/KinesisIndexTaskIOConfig.java | 12 +- .../kinesis/supervisor/KinesisSupervisor.java | 3 +- .../indexing/kinesis/KinesisIOConfigTest.java | 4 +- .../kinesis/KinesisIndexTaskSerdeTest.java | 4 +- .../kinesis/KinesisIndexTaskTest.java | 16 +- .../supervisor/KinesisSupervisorTest.java | 4 +- .../SeekableStreamIndexTask.java | 27 -- .../SeekableStreamIndexTaskIOConfig.java | 12 +- .../SeekableStreamIndexTaskRunner.java | 49 +++- ...SeekableStreamIndexTaskRunnerAuthTest.java | 4 +- .../SeekableStreamIndexTaskRunnerTest.java | 242 ++++++++++++++++++ .../SeekableStreamSupervisorSpecTest.java | 3 +- .../SeekableStreamSupervisorStateTest.java | 6 +- 21 files changed, 519 insertions(+), 111 deletions(-) create mode 100644 indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunnerTest.java diff --git a/docs/ingestion/supervisor.md b/docs/ingestion/supervisor.md index a07558389fa..0abdf7f2c3c 100644 --- a/docs/ingestion/supervisor.md +++ b/docs/ingestion/supervisor.md @@ -61,7 +61,7 @@ For configuration properties specific to Kafka and Kinesis, see [Kafka I/O confi |`completionTimeout`|ISO 8601 period|The length of time to wait before declaring a publishing task as failed and terminating it. If the value is too low, tasks may never publish. The publishing clock for a task begins roughly after `taskDuration` elapses.|No|`PT30M`| |`lateMessageRejectionStartDateTime`|ISO 8601 date time|Configures tasks to reject messages with timestamps earlier than this date time. For example, if this property is set to `2016-01-01T11:00Z` and the supervisor creates a task at `2016-01-01T12:00Z`, Druid drops messages with timestamps earlier than `2016-01-01T11:00Z`. This can prevent concurrency issues if your data stream has late messages and you have multiple pipelines that need to operate on the same segments, such as a realtime and a nightly batch ingestion pipeline.|No|| |`lateMessageRejectionPeriod`|ISO 8601 period|Configures tasks to reject messages with timestamps earlier than this period before the task was created. For example, if this property is set to `PT1H` and the supervisor creates a task at `2016-01-01T12:00Z`, Druid drops messages with timestamps earlier than `2016-01-01T11:00Z`. This may help prevent concurrency issues if your data stream has late messages and you have multiple pipelines that need to operate on the same segments, such as a streaming and a nightly batch ingestion pipeline. You can specify only one of the late message rejection properties.|No|| -|`earlyMessageRejectionPeriod`|ISO 8601 period|Configures tasks to reject messages with timestamps later than this period after the task reached its task duration. For example, if this property is set to `PT1H`, the task duration is set to `PT1H` and the supervisor creates a task at `2016-01-01T12:00Z`, Druid drops messages with timestamps later than `2016-01-01T14:00Z`. Tasks sometimes run past their task duration, such as in cases of supervisor failover. Setting `earlyMessageRejectionPeriod` too low may cause Druid to drop messages unexpectedly whenever a task runs past its originally configured task duration.|No|| +|`earlyMessageRejectionPeriod`|ISO 8601 period|Configures tasks to reject messages with timestamps later than this period after the task reached its task duration. For example, if this property is set to `PT1H`, the task duration is set to `PT1H` and the supervisor creates a task at `2016-01-01T12:00Z`, Druid drops messages with timestamps later than `2016-01-01T14:00Z`. Tasks sometimes run past their task duration, such as in cases of supervisor failover.|No|| #### Task autoscaler diff --git a/extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/RabbitStreamIndexTaskIOConfig.java b/extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/RabbitStreamIndexTaskIOConfig.java index 4c053d89455..721e66f6f3a 100644 --- a/extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/RabbitStreamIndexTaskIOConfig.java +++ b/extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/RabbitStreamIndexTaskIOConfig.java @@ -53,7 +53,9 @@ public class RabbitStreamIndexTaskIOConfig extends SeekableStreamIndexTaskIOConf @JsonProperty("minimumMessageTime") DateTime minimumMessageTime, @JsonProperty("maximumMessageTime") DateTime maximumMessageTime, @JsonProperty("inputFormat") @Nullable InputFormat inputFormat, - @JsonProperty("uri") String uri) + @JsonProperty("uri") String uri, + @JsonProperty("refreshRejectionPeriodsInMinutes") Long refreshRejectionPeriodsInMinutes + ) { super( taskGroupId, @@ -63,7 +65,9 @@ public class RabbitStreamIndexTaskIOConfig extends SeekableStreamIndexTaskIOConf useTransaction, minimumMessageTime, maximumMessageTime, - inputFormat); + inputFormat, + refreshRejectionPeriodsInMinutes + ); this.pollTimeout = pollTimeout != null ? pollTimeout : RabbitStreamSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS; this.uri = uri; diff --git a/extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/supervisor/RabbitStreamSupervisor.java b/extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/supervisor/RabbitStreamSupervisor.java index 222022e3e66..5e9d3c3d2c6 100644 --- a/extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/supervisor/RabbitStreamSupervisor.java +++ b/extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/supervisor/RabbitStreamSupervisor.java @@ -59,7 +59,6 @@ import org.apache.druid.segment.incremental.RowIngestionMetersFactory; import org.joda.time.DateTime; import javax.annotation.Nullable; - import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -202,7 +201,9 @@ public class RabbitStreamSupervisor extends SeekableStreamSupervisor future = runTask(task); @@ -461,7 +464,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase null, null, INPUT_FORMAT, - null + null, + Duration.standardHours(2).getStandardMinutes() ) ); task.addToContext(Tasks.STORE_EMPTY_COLUMNS_KEY, false); @@ -496,7 +500,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase null, null, null, - null + null, + Duration.standardHours(2).getStandardMinutes() ) ); @@ -537,7 +542,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase null, null, INPUT_FORMAT, - null + null, + Duration.standardHours(2).getStandardMinutes() ) ); @@ -588,7 +594,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase null, null, INPUT_FORMAT, - null + null, + Duration.standardHours(2).getStandardMinutes() ) ); final ListenableFuture future = runTask(task); @@ -665,7 +672,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase null, null, INPUT_FORMAT, - null + null, + Duration.standardHours(2).getStandardMinutes() ) ); final ListenableFuture future = runTask(task); @@ -768,7 +776,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase null, null, INPUT_FORMAT, - null + null, + Duration.standardHours(2).getStandardMinutes() ) ); final ListenableFuture future = runTask(task); @@ -894,7 +903,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase null, null, INPUT_FORMAT, - null + null, + Duration.standardHours(2).getStandardMinutes() ) ); final ListenableFuture future = runTask(task); @@ -972,7 +982,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase null, null, INPUT_FORMAT, - null + null, + Duration.standardHours(2).getStandardMinutes() ) ); final ListenableFuture future = runTask(task); @@ -1035,7 +1046,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase null, null, INPUT_FORMAT, - null + null, + Duration.standardHours(2).getStandardMinutes() ) ); final KafkaIndexTask staleReplica = createTask( @@ -1051,7 +1063,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase null, null, INPUT_FORMAT, - null + null, + Duration.standardHours(2).getStandardMinutes() ) ); @@ -1117,7 +1130,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase DateTimes.of("2010"), null, INPUT_FORMAT, - null + null, + Duration.standardHours(2).getStandardMinutes() ) ); @@ -1165,7 +1179,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase null, DateTimes.of("2010"), INPUT_FORMAT, - null + null, + Duration.standardHours(2).getStandardMinutes() ) ); @@ -1222,7 +1237,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase null, null, INPUT_FORMAT, - null + null, + Duration.standardHours(2).getStandardMinutes() ) ); @@ -1293,7 +1309,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase null, null, new TestKafkaInputFormat(INPUT_FORMAT), - null + null, + Duration.standardHours(2).getStandardMinutes() ) ); Assert.assertTrue(task.supportsQueries()); @@ -1365,7 +1382,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase null, null, KAFKA_INPUT_FORMAT, - null + null, + Duration.standardHours(2).getStandardMinutes() ) ); Assert.assertTrue(task.supportsQueries()); @@ -1416,7 +1434,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase null, null, INPUT_FORMAT, - null + null, + Duration.standardHours(2).getStandardMinutes() ) ); @@ -1451,7 +1470,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase null, null, INPUT_FORMAT, - null + null, + Duration.standardHours(2).getStandardMinutes() ) ); @@ -1497,7 +1517,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase null, null, INPUT_FORMAT, - null + null, + Duration.standardHours(2).getStandardMinutes() ) ); @@ -1548,7 +1569,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase null, null, INPUT_FORMAT, - null + null, + Duration.standardHours(2).getStandardMinutes() ) ); @@ -1586,7 +1608,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase null, null, INPUT_FORMAT, - null + null, + Duration.standardHours(2).getStandardMinutes() ) ); @@ -1678,7 +1701,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase null, null, INPUT_FORMAT, - null + null, + Duration.standardHours(2).getStandardMinutes() ) ); @@ -1744,7 +1768,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase null, null, INPUT_FORMAT, - null + null, + Duration.standardHours(2).getStandardMinutes() ) ); final KafkaIndexTask task2 = createTask( @@ -1760,7 +1785,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase null, null, INPUT_FORMAT, - null + null, + Duration.standardHours(2).getStandardMinutes() ) ); @@ -1808,7 +1834,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase null, null, INPUT_FORMAT, - null + null, + Duration.standardHours(2).getStandardMinutes() ) ); final KafkaIndexTask task2 = createTask( @@ -1824,7 +1851,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase null, null, INPUT_FORMAT, - null + null, + Duration.standardHours(2).getStandardMinutes() ) ); @@ -1874,7 +1902,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase null, null, INPUT_FORMAT, - null + null, + Duration.standardHours(2).getStandardMinutes() ) ); final KafkaIndexTask task2 = createTask( @@ -1890,7 +1919,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase null, null, INPUT_FORMAT, - null + null, + Duration.standardHours(2).getStandardMinutes() ) ); @@ -1938,7 +1968,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase null, null, INPUT_FORMAT, - null + null, + Duration.standardHours(2).getStandardMinutes() ) ); @@ -1984,7 +2015,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase null, null, INPUT_FORMAT, - null + null, + Duration.standardHours(2).getStandardMinutes() ) ); final KafkaIndexTask task2 = createTask( @@ -2000,7 +2032,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase null, null, INPUT_FORMAT, - null + null, + Duration.standardHours(2).getStandardMinutes() ) ); @@ -2050,7 +2083,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase null, null, INPUT_FORMAT, - null + null, + Duration.standardHours(2).getStandardMinutes() ) ); @@ -2084,7 +2118,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase null, null, INPUT_FORMAT, - null + null, + Duration.standardHours(2).getStandardMinutes() ) ); @@ -2134,7 +2169,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase null, null, INPUT_FORMAT, - null + null, + Duration.standardHours(2).getStandardMinutes() ) ); @@ -2177,7 +2213,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase null, null, INPUT_FORMAT, - null + null, + Duration.standardHours(2).getStandardMinutes() ) ); @@ -2228,7 +2265,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase null, null, INPUT_FORMAT, - null + null, + Duration.standardHours(2).getStandardMinutes() ) ); @@ -2299,7 +2337,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase null, null, INPUT_FORMAT, - null + null, + Duration.standardHours(2).getStandardMinutes() ) ); @@ -2336,7 +2375,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase null, null, INPUT_FORMAT, - null + null, + Duration.standardHours(2).getStandardMinutes() ) ); @@ -2383,7 +2423,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase null, null, INPUT_FORMAT, - null + null, + Duration.standardHours(2).getStandardMinutes() ), context ); @@ -2427,7 +2468,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase null, null, INPUT_FORMAT, - null + null, + Duration.standardHours(2).getStandardMinutes() ) ); @@ -2466,7 +2508,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase null, null, INPUT_FORMAT, - null + null, + Duration.standardHours(2).getStandardMinutes() ) ); @@ -2547,7 +2590,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase null, null, INPUT_FORMAT, - null + null, + Duration.standardHours(2).getStandardMinutes() ) ); @@ -2607,7 +2651,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase null, null, INPUT_FORMAT, - null + null, + Duration.standardHours(2).getStandardMinutes() ) ); final ListenableFuture future = runTask(task); @@ -2630,7 +2675,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase null, null, INPUT_FORMAT, - null + null, + Duration.standardHours(2).getStandardMinutes() ) ); @@ -2676,7 +2722,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase null, null, INPUT_FORMAT, - null + null, + Duration.standardHours(2).getStandardMinutes() ) ); @@ -2708,7 +2755,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase null, null, INPUT_FORMAT, - null + null, + Duration.standardHours(2).getStandardMinutes() ) ); @@ -2968,7 +3016,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase null, null, INPUT_FORMAT, - null + null, + Duration.standardHours(2).getStandardMinutes() ) ); @@ -3029,7 +3078,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase null, null, new TestKafkaFormatWithMalformedDataDetection(INPUT_FORMAT), - null + null, + Duration.standardHours(2).getStandardMinutes() ) ); @@ -3101,7 +3151,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase null, null, new TestKafkaFormatWithMalformedDataDetection(INPUT_FORMAT), - null + null, + Duration.standardHours(2).getStandardMinutes() ) ); @@ -3175,7 +3226,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase null, null, new TestKafkaFormatWithMalformedDataDetection(INPUT_FORMAT), - null + null, + Duration.standardHours(2).getStandardMinutes() ) ); @@ -3227,7 +3279,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase null, null, INPUT_FORMAT, - null + null, + Duration.standardHours(2).getStandardMinutes() ) ); @@ -3281,7 +3334,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase null, null, INPUT_FORMAT, - null + null, + Duration.standardHours(2).getStandardMinutes() ) ); diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java index 7926a0568fd..8f2f3167ecd 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java @@ -107,6 +107,7 @@ import org.easymock.EasyMock; import org.easymock.EasyMockSupport; import org.easymock.IAnswer; import org.joda.time.DateTime; +import org.joda.time.Duration; import org.joda.time.Period; import org.junit.After; import org.junit.AfterClass; @@ -496,7 +497,8 @@ public class KafkaSupervisorTest extends EasyMockSupport null, null, INPUT_FORMAT, - null + null, + Duration.standardHours(2).getStandardMinutes() ), new KafkaIndexTaskTuningConfig( null, @@ -5641,7 +5643,8 @@ public class KafkaSupervisorTest extends EasyMockSupport minimumMessageTime, maximumMessageTime, INPUT_FORMAT, - null + null, + Duration.standardHours(2).getStandardMinutes() ), Collections.emptyMap(), OBJECT_MAPPER diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskIOConfig.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskIOConfig.java index 881d68ba896..2df59bbca35 100644 --- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskIOConfig.java +++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskIOConfig.java @@ -78,7 +78,8 @@ public class KinesisIndexTaskIOConfig extends SeekableStreamIndexTaskIOConfig minimumMessageTime; private final Optional maximumMessageTime; private final InputFormat inputFormat; + private final Long refreshRejectionPeriodsInMinutes; public SeekableStreamIndexTaskIOConfig( @Nullable final Integer taskGroupId, // can be null for backward compabitility @@ -51,7 +52,8 @@ public abstract class SeekableStreamIndexTaskIOConfig partitionsThroughput = new HashMap<>(); + private volatile DateTime minMessageTime; + private volatile DateTime maxMessageTime; + public SeekableStreamIndexTaskRunner( final SeekableStreamIndexTask task, @Nullable final InputRowParser parser, @@ -267,6 +271,18 @@ public abstract class SeekableStreamIndexTaskRunner row != null && task.withinMinMaxRecordTime(row), + row -> row != null && withinMinMaxRecordTime(row), rowIngestionMeters, parseExceptionHandler ); @@ -2092,4 +2108,35 @@ public abstract class SeekableStreamIndexTaskRunner>> getSequenceMetadataTypeReference(); + + private void refreshMinMaxMessageTime() + { + minMessageTime = minMessageTime.plusMinutes(ioConfig.getRefreshRejectionPeriodsInMinutes().intValue()); + maxMessageTime = maxMessageTime.plusMinutes(ioConfig.getRefreshRejectionPeriodsInMinutes().intValue()); + + log.info(StringUtils.format("Updated min and max messsage times to %s and %s respectively.", minMessageTime, maxMessageTime)); + } + + public boolean withinMinMaxRecordTime(final InputRow row) + { + final boolean beforeMinimumMessageTime = minMessageTime.isAfter(row.getTimestamp()); + final boolean afterMaximumMessageTime = maxMessageTime.isBefore(row.getTimestamp()); + + if (log.isDebugEnabled()) { + if (beforeMinimumMessageTime) { + log.debug( + "CurrentTimeStamp[%s] is before MinimumMessageTime[%s]", + row.getTimestamp(), + minMessageTime + ); + } else if (afterMaximumMessageTime) { + log.debug( + "CurrentTimeStamp[%s] is after MaximumMessageTime[%s]", + row.getTimestamp(), + maxMessageTime + ); + } + } + return !beforeMinimumMessageTime && !afterMaximumMessageTime; + } } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunnerAuthTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunnerAuthTest.java index 8b1bd4fb096..d3b69d438d5 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunnerAuthTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunnerAuthTest.java @@ -44,6 +44,7 @@ import org.apache.druid.server.security.AuthorizerMapper; import org.apache.druid.server.security.ForbiddenException; import org.apache.druid.server.security.ResourceType; import org.easymock.EasyMock; +import org.joda.time.Duration; import org.junit.Before; import org.junit.Rule; import org.junit.Test; @@ -380,7 +381,8 @@ public class SeekableStreamIndexTaskRunnerAuthTest false, DateTimes.nowUtc().minusDays(2), DateTimes.nowUtc(), - new CsvInputFormat(null, null, true, null, 0, null) + new CsvInputFormat(null, null, true, null, 0, null), + Duration.standardHours(2).getStandardMinutes() ); } } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunnerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunnerTest.java new file mode 100644 index 00000000000..f78fd680e36 --- /dev/null +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunnerTest.java @@ -0,0 +1,242 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.seekablestream; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Optional; +import com.google.common.collect.ImmutableMap; +import org.apache.druid.data.input.InputRow; +import org.apache.druid.data.input.impl.DimensionsSpec; +import org.apache.druid.data.input.impl.InputRowParser; +import org.apache.druid.data.input.impl.JsonInputFormat; +import org.apache.druid.data.input.impl.StringDimensionSchema; +import org.apache.druid.data.input.impl.TimestampSpec; +import org.apache.druid.indexing.common.LockGranularity; +import org.apache.druid.indexing.common.TaskToolbox; +import org.apache.druid.indexing.seekablestream.common.OrderedPartitionableRecord; +import org.apache.druid.indexing.seekablestream.common.OrderedSequenceNumber; +import org.apache.druid.indexing.seekablestream.common.RecordSupplier; +import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.java.util.common.granularity.Granularities; +import org.apache.druid.segment.indexing.DataSchema; +import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec; +import org.apache.druid.server.security.AuthorizerMapper; +import org.joda.time.DateTime; +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.junit.MockitoJUnitRunner; + +import javax.annotation.Nullable; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; + +@RunWith(MockitoJUnitRunner.class) +public class SeekableStreamIndexTaskRunnerTest +{ + @Mock + private InputRow row; + + @Mock + private SeekableStreamIndexTask task; + + @Test + public void testWithinMinMaxTime() + { + DimensionsSpec dimensionsSpec = new DimensionsSpec( + Arrays.asList( + new StringDimensionSchema("d1"), + new StringDimensionSchema("d2") + ) + ); + DataSchema schema = + DataSchema.builder() + .withDataSource("datasource") + .withTimestamp(new TimestampSpec(null, null, null)) + .withDimensions(dimensionsSpec) + .withGranularity( + new UniformGranularitySpec(Granularities.MINUTE, Granularities.NONE, null) + ) + .build(); + + SeekableStreamIndexTaskTuningConfig tuningConfig = Mockito.mock(SeekableStreamIndexTaskTuningConfig.class); + SeekableStreamIndexTaskIOConfig ioConfig = Mockito.mock(SeekableStreamIndexTaskIOConfig.class); + SeekableStreamStartSequenceNumbers sequenceNumbers = Mockito.mock(SeekableStreamStartSequenceNumbers.class); + SeekableStreamEndSequenceNumbers endSequenceNumbers = Mockito.mock(SeekableStreamEndSequenceNumbers.class); + + DateTime now = DateTimes.nowUtc(); + + Mockito.when(ioConfig.getRefreshRejectionPeriodsInMinutes()).thenReturn(120L); + Mockito.when(ioConfig.getMaximumMessageTime()).thenReturn(Optional.of(DateTimes.nowUtc().plusHours(2))); + Mockito.when(ioConfig.getMinimumMessageTime()).thenReturn(Optional.of(DateTimes.nowUtc().minusHours(2))); + Mockito.when(ioConfig.getInputFormat()).thenReturn(new JsonInputFormat(null, null, null, null, null)); + Mockito.when(ioConfig.getStartSequenceNumbers()).thenReturn(sequenceNumbers); + Mockito.when(ioConfig.getEndSequenceNumbers()).thenReturn(endSequenceNumbers); + + Mockito.when(endSequenceNumbers.getPartitionSequenceNumberMap()).thenReturn(ImmutableMap.of()); + Mockito.when(sequenceNumbers.getStream()).thenReturn("test"); + + Mockito.when(task.getDataSchema()).thenReturn(schema); + Mockito.when(task.getIOConfig()).thenReturn(ioConfig); + Mockito.when(task.getTuningConfig()).thenReturn(tuningConfig); + + TestasbleSeekableStreamIndexTaskRunner runner = new TestasbleSeekableStreamIndexTaskRunner(task, null, null, LockGranularity.TIME_CHUNK); + + Mockito.when(row.getTimestamp()).thenReturn(now); + Assert.assertTrue(runner.withinMinMaxRecordTime(row)); + + Mockito.when(row.getTimestamp()).thenReturn(now.minusHours(2).minusMinutes(1)); + Assert.assertFalse(runner.withinMinMaxRecordTime(row)); + + Mockito.when(row.getTimestamp()).thenReturn(now.plusHours(2).plusMinutes(1)); + Assert.assertFalse(runner.withinMinMaxRecordTime(row)); + } + + @Test + public void testWithinMinMaxTimeNotPopulated() + { + DimensionsSpec dimensionsSpec = new DimensionsSpec( + Arrays.asList( + new StringDimensionSchema("d1"), + new StringDimensionSchema("d2") + ) + ); + DataSchema schema = + DataSchema.builder() + .withDataSource("datasource") + .withTimestamp(new TimestampSpec(null, null, null)) + .withDimensions(dimensionsSpec) + .withGranularity( + new UniformGranularitySpec(Granularities.MINUTE, Granularities.NONE, null) + ) + .build(); + + SeekableStreamIndexTaskTuningConfig tuningConfig = Mockito.mock(SeekableStreamIndexTaskTuningConfig.class); + SeekableStreamIndexTaskIOConfig ioConfig = Mockito.mock(SeekableStreamIndexTaskIOConfig.class); + SeekableStreamStartSequenceNumbers sequenceNumbers = Mockito.mock(SeekableStreamStartSequenceNumbers.class); + SeekableStreamEndSequenceNumbers endSequenceNumbers = Mockito.mock(SeekableStreamEndSequenceNumbers.class); + + DateTime now = DateTimes.nowUtc(); + + Mockito.when(ioConfig.getRefreshRejectionPeriodsInMinutes()).thenReturn(null); + // min max time not populated. + Mockito.when(ioConfig.getMaximumMessageTime()).thenReturn(Optional.absent()); + Mockito.when(ioConfig.getMinimumMessageTime()).thenReturn(Optional.absent()); + Mockito.when(ioConfig.getInputFormat()).thenReturn(new JsonInputFormat(null, null, null, null, null)); + Mockito.when(ioConfig.getStartSequenceNumbers()).thenReturn(sequenceNumbers); + Mockito.when(ioConfig.getEndSequenceNumbers()).thenReturn(endSequenceNumbers); + + Mockito.when(endSequenceNumbers.getPartitionSequenceNumberMap()).thenReturn(ImmutableMap.of()); + Mockito.when(sequenceNumbers.getStream()).thenReturn("test"); + + Mockito.when(task.getDataSchema()).thenReturn(schema); + Mockito.when(task.getIOConfig()).thenReturn(ioConfig); + Mockito.when(task.getTuningConfig()).thenReturn(tuningConfig); + TestasbleSeekableStreamIndexTaskRunner runner = new TestasbleSeekableStreamIndexTaskRunner(task, null, null, LockGranularity.TIME_CHUNK); + + Assert.assertTrue(runner.withinMinMaxRecordTime(row)); + + Mockito.when(row.getTimestamp()).thenReturn(now.minusHours(2).minusMinutes(1)); + Assert.assertTrue(runner.withinMinMaxRecordTime(row)); + + Mockito.when(row.getTimestamp()).thenReturn(now.plusHours(2).plusMinutes(1)); + Assert.assertTrue(runner.withinMinMaxRecordTime(row)); + } + + static class TestasbleSeekableStreamIndexTaskRunner extends SeekableStreamIndexTaskRunner + { + public TestasbleSeekableStreamIndexTaskRunner( + SeekableStreamIndexTask task, + @Nullable InputRowParser parser, + AuthorizerMapper authorizerMapper, + LockGranularity lockGranularityToUse + ) + { + super(task, parser, authorizerMapper, lockGranularityToUse); + } + + @Override + protected boolean isEndOfShard(Object seqNum) + { + return false; + } + + @Nullable + @Override + protected TreeMap getCheckPointsFromContext(TaskToolbox toolbox, String checkpointsString) + { + return null; + } + + @Override + protected Object getNextStartOffset(Object sequenceNumber) + { + return null; + } + + @Override + protected SeekableStreamEndSequenceNumbers deserializePartitionsFromMetadata(ObjectMapper mapper, Object object) + { + return null; + } + + @Override + protected List getRecords(RecordSupplier recordSupplier, TaskToolbox toolbox) + { + return null; + } + + @Override + protected SeekableStreamDataSourceMetadata createDataSourceMetadata(SeekableStreamSequenceNumbers partitions) + { + return null; + } + + @Override + protected OrderedSequenceNumber createSequenceNumber(Object sequenceNumber) + { + return null; + } + + @Override + protected boolean isEndOffsetExclusive() + { + return false; + } + + @Override + protected TypeReference> getSequenceMetadataTypeReference() + { + return null; + } + + @Override + protected void possiblyResetDataSourceMetadata(TaskToolbox toolbox, RecordSupplier recordSupplier, Set assignment) + { + + } + } +} diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java index 3281360f580..5f5aeb0ca85 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java @@ -196,7 +196,8 @@ public class SeekableStreamSupervisorSpecTest extends EasyMockSupport true, minimumMessageTime, maximumMessageTime, - ioConfig.getInputFormat() + ioConfig.getInputFormat(), + ioConfig.getTaskDuration().getStandardMinutes() ) { }; diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java index af66ce3b8b9..8a3d36ea67f 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java @@ -2807,7 +2807,8 @@ public class SeekableStreamSupervisorStateTest extends EasyMockSupport true, minimumMessageTime, maximumMessageTime, - ioConfig.getInputFormat() + ioConfig.getInputFormat(), + ioConfig.getTaskDuration().getStandardMinutes() ) { }; @@ -3166,7 +3167,8 @@ public class SeekableStreamSupervisorStateTest extends EasyMockSupport true, minimumMessageTime, maximumMessageTime, - ioConfig.getInputFormat() + ioConfig.getInputFormat(), + ioConfig.getTaskDuration().getStandardMinutes() ) { };