supervisor: make rejection periods work with stopTasksCount (#17442)

* kafka-indexing: Report consumer io time

* commit

* backward

* tests

* remove unwanted changes

* comments

* comments

* coverage

* change name

* fixes

* fixes

* comments
This commit is contained in:
Adithya Chakilam 2024-11-18 15:12:24 -06:00 committed by GitHub
parent 24a1fafaa7
commit 6f436301be
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
21 changed files with 519 additions and 111 deletions

View File

@ -61,7 +61,7 @@ For configuration properties specific to Kafka and Kinesis, see [Kafka I/O confi
|`completionTimeout`|ISO 8601 period|The length of time to wait before declaring a publishing task as failed and terminating it. If the value is too low, tasks may never publish. The publishing clock for a task begins roughly after `taskDuration` elapses.|No|`PT30M`| |`completionTimeout`|ISO 8601 period|The length of time to wait before declaring a publishing task as failed and terminating it. If the value is too low, tasks may never publish. The publishing clock for a task begins roughly after `taskDuration` elapses.|No|`PT30M`|
|`lateMessageRejectionStartDateTime`|ISO 8601 date time|Configures tasks to reject messages with timestamps earlier than this date time. For example, if this property is set to `2016-01-01T11:00Z` and the supervisor creates a task at `2016-01-01T12:00Z`, Druid drops messages with timestamps earlier than `2016-01-01T11:00Z`. This can prevent concurrency issues if your data stream has late messages and you have multiple pipelines that need to operate on the same segments, such as a realtime and a nightly batch ingestion pipeline.|No|| |`lateMessageRejectionStartDateTime`|ISO 8601 date time|Configures tasks to reject messages with timestamps earlier than this date time. For example, if this property is set to `2016-01-01T11:00Z` and the supervisor creates a task at `2016-01-01T12:00Z`, Druid drops messages with timestamps earlier than `2016-01-01T11:00Z`. This can prevent concurrency issues if your data stream has late messages and you have multiple pipelines that need to operate on the same segments, such as a realtime and a nightly batch ingestion pipeline.|No||
|`lateMessageRejectionPeriod`|ISO 8601 period|Configures tasks to reject messages with timestamps earlier than this period before the task was created. For example, if this property is set to `PT1H` and the supervisor creates a task at `2016-01-01T12:00Z`, Druid drops messages with timestamps earlier than `2016-01-01T11:00Z`. This may help prevent concurrency issues if your data stream has late messages and you have multiple pipelines that need to operate on the same segments, such as a streaming and a nightly batch ingestion pipeline. You can specify only one of the late message rejection properties.|No|| |`lateMessageRejectionPeriod`|ISO 8601 period|Configures tasks to reject messages with timestamps earlier than this period before the task was created. For example, if this property is set to `PT1H` and the supervisor creates a task at `2016-01-01T12:00Z`, Druid drops messages with timestamps earlier than `2016-01-01T11:00Z`. This may help prevent concurrency issues if your data stream has late messages and you have multiple pipelines that need to operate on the same segments, such as a streaming and a nightly batch ingestion pipeline. You can specify only one of the late message rejection properties.|No||
|`earlyMessageRejectionPeriod`|ISO 8601 period|Configures tasks to reject messages with timestamps later than this period after the task reached its task duration. For example, if this property is set to `PT1H`, the task duration is set to `PT1H` and the supervisor creates a task at `2016-01-01T12:00Z`, Druid drops messages with timestamps later than `2016-01-01T14:00Z`. Tasks sometimes run past their task duration, such as in cases of supervisor failover. Setting `earlyMessageRejectionPeriod` too low may cause Druid to drop messages unexpectedly whenever a task runs past its originally configured task duration.|No|| |`earlyMessageRejectionPeriod`|ISO 8601 period|Configures tasks to reject messages with timestamps later than this period after the task reached its task duration. For example, if this property is set to `PT1H`, the task duration is set to `PT1H` and the supervisor creates a task at `2016-01-01T12:00Z`, Druid drops messages with timestamps later than `2016-01-01T14:00Z`. Tasks sometimes run past their task duration, such as in cases of supervisor failover.|No||
#### Task autoscaler #### Task autoscaler

View File

@ -53,7 +53,9 @@ public class RabbitStreamIndexTaskIOConfig extends SeekableStreamIndexTaskIOConf
@JsonProperty("minimumMessageTime") DateTime minimumMessageTime, @JsonProperty("minimumMessageTime") DateTime minimumMessageTime,
@JsonProperty("maximumMessageTime") DateTime maximumMessageTime, @JsonProperty("maximumMessageTime") DateTime maximumMessageTime,
@JsonProperty("inputFormat") @Nullable InputFormat inputFormat, @JsonProperty("inputFormat") @Nullable InputFormat inputFormat,
@JsonProperty("uri") String uri) @JsonProperty("uri") String uri,
@JsonProperty("refreshRejectionPeriodsInMinutes") Long refreshRejectionPeriodsInMinutes
)
{ {
super( super(
taskGroupId, taskGroupId,
@ -63,7 +65,9 @@ public class RabbitStreamIndexTaskIOConfig extends SeekableStreamIndexTaskIOConf
useTransaction, useTransaction,
minimumMessageTime, minimumMessageTime,
maximumMessageTime, maximumMessageTime,
inputFormat); inputFormat,
refreshRejectionPeriodsInMinutes
);
this.pollTimeout = pollTimeout != null ? pollTimeout : RabbitStreamSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS; this.pollTimeout = pollTimeout != null ? pollTimeout : RabbitStreamSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS;
this.uri = uri; this.uri = uri;

View File

@ -59,7 +59,6 @@ import org.apache.druid.segment.incremental.RowIngestionMetersFactory;
import org.joda.time.DateTime; import org.joda.time.DateTime;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
@ -202,7 +201,9 @@ public class RabbitStreamSupervisor extends SeekableStreamSupervisor<String, Lon
minimumMessageTime, minimumMessageTime,
maximumMessageTime, maximumMessageTime,
ioConfig.getInputFormat(), ioConfig.getInputFormat(),
rabbitConfig.getUri()); rabbitConfig.getUri(),
ioConfig.getTaskDuration().getStandardMinutes()
);
} }
@Override @Override

View File

@ -22,6 +22,7 @@ package org.apache.druid.indexing.rabbitstream.supervisor;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import org.apache.druid.data.input.InputFormat; import org.apache.druid.data.input.InputFormat;
import org.apache.druid.data.input.impl.DimensionSchema; import org.apache.druid.data.input.impl.DimensionSchema;
import org.apache.druid.data.input.impl.JsonInputFormat; import org.apache.druid.data.input.impl.JsonInputFormat;
@ -37,6 +38,7 @@ import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManagerConfi
import org.apache.druid.indexing.rabbitstream.RabbitStreamIndexTaskClientFactory; import org.apache.druid.indexing.rabbitstream.RabbitStreamIndexTaskClientFactory;
import org.apache.druid.indexing.rabbitstream.RabbitStreamRecordSupplier; import org.apache.druid.indexing.rabbitstream.RabbitStreamRecordSupplier;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskClient; import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskClient;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskIOConfig;
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorReportPayload; import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorReportPayload;
import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.parsers.JSONPathSpec; import org.apache.druid.java.util.common.parsers.JSONPathSpec;
@ -366,4 +368,49 @@ public class RabbitStreamSupervisorTest extends EasyMockSupport
Assert.assertEquals(30 * 60, payload.getDurationSeconds()); Assert.assertEquals(30 * 60, payload.getDurationSeconds());
} }
@Test
public void testCreateTaskIOConfig()
{
supervisor = getSupervisor(
1,
1,
false,
"PT30M",
null,
null,
RabbitStreamSupervisorTest.dataSchema,
tuningConfig
);
SeekableStreamIndexTaskIOConfig ioConfig = supervisor.createTaskIoConfig(
1,
ImmutableMap.of(),
ImmutableMap.of(),
"test",
null,
null,
ImmutableSet.of(),
new RabbitStreamSupervisorIOConfig(
STREAM, // stream
URI, // uri
INPUT_FORMAT, // inputFormat
1, // replicas
1, // taskCount
new Period("PT30M"), // taskDuration
null, // consumerProperties
null, // autoscalerConfig
400L, // poll timeout
new Period("P1D"), // start delat
new Period("PT30M"), // period
new Period("PT30S"), // completiontimeout
false, // useearliest
null, // latemessagerejection
null, // early message rejection
null, // latemessagerejectionstartdatetime
1
)
);
Assert.assertEquals(30L, ioConfig.getRefreshRejectionPeriodsInMinutes().longValue());
}
} }

View File

@ -63,7 +63,8 @@ public class KafkaIndexTaskIOConfig extends SeekableStreamIndexTaskIOConfig<Kafk
@JsonProperty("maximumMessageTime") DateTime maximumMessageTime, @JsonProperty("maximumMessageTime") DateTime maximumMessageTime,
@JsonProperty("inputFormat") @Nullable InputFormat inputFormat, @JsonProperty("inputFormat") @Nullable InputFormat inputFormat,
@JsonProperty("configOverrides") @Nullable KafkaConfigOverrides configOverrides, @JsonProperty("configOverrides") @Nullable KafkaConfigOverrides configOverrides,
@JsonProperty("multiTopic") @Nullable Boolean multiTopic @JsonProperty("multiTopic") @Nullable Boolean multiTopic,
@JsonProperty("refreshRejectionPeriodsInMinutes") Long refreshRejectionPeriodsInMinutes
) )
{ {
super( super(
@ -76,7 +77,8 @@ public class KafkaIndexTaskIOConfig extends SeekableStreamIndexTaskIOConfig<Kafk
useTransaction, useTransaction,
minimumMessageTime, minimumMessageTime,
maximumMessageTime, maximumMessageTime,
inputFormat inputFormat,
refreshRejectionPeriodsInMinutes
); );
this.consumerProperties = Preconditions.checkNotNull(consumerProperties, "consumerProperties"); this.consumerProperties = Preconditions.checkNotNull(consumerProperties, "consumerProperties");
@ -107,7 +109,8 @@ public class KafkaIndexTaskIOConfig extends SeekableStreamIndexTaskIOConfig<Kafk
DateTime minimumMessageTime, DateTime minimumMessageTime,
DateTime maximumMessageTime, DateTime maximumMessageTime,
InputFormat inputFormat, InputFormat inputFormat,
KafkaConfigOverrides configOverrides KafkaConfigOverrides configOverrides,
Long refreshRejectionPeriodsInMinutes
) )
{ {
this( this(
@ -124,7 +127,8 @@ public class KafkaIndexTaskIOConfig extends SeekableStreamIndexTaskIOConfig<Kafk
maximumMessageTime, maximumMessageTime,
inputFormat, inputFormat,
configOverrides, configOverrides,
KafkaSupervisorIOConfig.DEFAULT_IS_MULTI_TOPIC KafkaSupervisorIOConfig.DEFAULT_IS_MULTI_TOPIC,
refreshRejectionPeriodsInMinutes
); );
} }

View File

@ -218,7 +218,8 @@ public class KafkaSupervisor extends SeekableStreamSupervisor<KafkaTopicPartitio
maximumMessageTime, maximumMessageTime,
ioConfig.getInputFormat(), ioConfig.getInputFormat(),
kafkaIoConfig.getConfigOverrides(), kafkaIoConfig.getConfigOverrides(),
kafkaIoConfig.isMultiTopic() kafkaIoConfig.isMultiTopic(),
ioConfig.getTaskDuration().getStandardMinutes()
); );
} }

View File

@ -120,6 +120,7 @@ import org.apache.druid.timeline.DataSegment;
import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Header; import org.apache.kafka.common.header.Header;
import org.joda.time.Duration;
import org.joda.time.Period; import org.joda.time.Period;
import org.junit.After; import org.junit.After;
import org.junit.AfterClass; import org.junit.AfterClass;
@ -357,7 +358,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase
null, null,
null, null,
INPUT_FORMAT, INPUT_FORMAT,
null null,
Duration.standardHours(2).getStandardMinutes()
) )
); );
Assert.assertTrue(task.supportsQueries()); Assert.assertTrue(task.supportsQueries());
@ -413,7 +415,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase
null, null,
null, null,
INPUT_FORMAT, INPUT_FORMAT,
null null,
Duration.standardHours(2).getStandardMinutes()
) )
); );
final ListenableFuture<TaskStatus> future = runTask(task); final ListenableFuture<TaskStatus> future = runTask(task);
@ -461,7 +464,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase
null, null,
null, null,
INPUT_FORMAT, INPUT_FORMAT,
null null,
Duration.standardHours(2).getStandardMinutes()
) )
); );
task.addToContext(Tasks.STORE_EMPTY_COLUMNS_KEY, false); task.addToContext(Tasks.STORE_EMPTY_COLUMNS_KEY, false);
@ -496,7 +500,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase
null, null,
null, null,
null, null,
null null,
Duration.standardHours(2).getStandardMinutes()
) )
); );
@ -537,7 +542,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase
null, null,
null, null,
INPUT_FORMAT, INPUT_FORMAT,
null null,
Duration.standardHours(2).getStandardMinutes()
) )
); );
@ -588,7 +594,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase
null, null,
null, null,
INPUT_FORMAT, INPUT_FORMAT,
null null,
Duration.standardHours(2).getStandardMinutes()
) )
); );
final ListenableFuture<TaskStatus> future = runTask(task); final ListenableFuture<TaskStatus> future = runTask(task);
@ -665,7 +672,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase
null, null,
null, null,
INPUT_FORMAT, INPUT_FORMAT,
null null,
Duration.standardHours(2).getStandardMinutes()
) )
); );
final ListenableFuture<TaskStatus> future = runTask(task); final ListenableFuture<TaskStatus> future = runTask(task);
@ -768,7 +776,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase
null, null,
null, null,
INPUT_FORMAT, INPUT_FORMAT,
null null,
Duration.standardHours(2).getStandardMinutes()
) )
); );
final ListenableFuture<TaskStatus> future = runTask(task); final ListenableFuture<TaskStatus> future = runTask(task);
@ -894,7 +903,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase
null, null,
null, null,
INPUT_FORMAT, INPUT_FORMAT,
null null,
Duration.standardHours(2).getStandardMinutes()
) )
); );
final ListenableFuture<TaskStatus> future = runTask(task); final ListenableFuture<TaskStatus> future = runTask(task);
@ -972,7 +982,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase
null, null,
null, null,
INPUT_FORMAT, INPUT_FORMAT,
null null,
Duration.standardHours(2).getStandardMinutes()
) )
); );
final ListenableFuture<TaskStatus> future = runTask(task); final ListenableFuture<TaskStatus> future = runTask(task);
@ -1035,7 +1046,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase
null, null,
null, null,
INPUT_FORMAT, INPUT_FORMAT,
null null,
Duration.standardHours(2).getStandardMinutes()
) )
); );
final KafkaIndexTask staleReplica = createTask( final KafkaIndexTask staleReplica = createTask(
@ -1051,7 +1063,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase
null, null,
null, null,
INPUT_FORMAT, INPUT_FORMAT,
null null,
Duration.standardHours(2).getStandardMinutes()
) )
); );
@ -1117,7 +1130,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase
DateTimes.of("2010"), DateTimes.of("2010"),
null, null,
INPUT_FORMAT, INPUT_FORMAT,
null null,
Duration.standardHours(2).getStandardMinutes()
) )
); );
@ -1165,7 +1179,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase
null, null,
DateTimes.of("2010"), DateTimes.of("2010"),
INPUT_FORMAT, INPUT_FORMAT,
null null,
Duration.standardHours(2).getStandardMinutes()
) )
); );
@ -1222,7 +1237,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase
null, null,
null, null,
INPUT_FORMAT, INPUT_FORMAT,
null null,
Duration.standardHours(2).getStandardMinutes()
) )
); );
@ -1293,7 +1309,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase
null, null,
null, null,
new TestKafkaInputFormat(INPUT_FORMAT), new TestKafkaInputFormat(INPUT_FORMAT),
null null,
Duration.standardHours(2).getStandardMinutes()
) )
); );
Assert.assertTrue(task.supportsQueries()); Assert.assertTrue(task.supportsQueries());
@ -1365,7 +1382,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase
null, null,
null, null,
KAFKA_INPUT_FORMAT, KAFKA_INPUT_FORMAT,
null null,
Duration.standardHours(2).getStandardMinutes()
) )
); );
Assert.assertTrue(task.supportsQueries()); Assert.assertTrue(task.supportsQueries());
@ -1416,7 +1434,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase
null, null,
null, null,
INPUT_FORMAT, INPUT_FORMAT,
null null,
Duration.standardHours(2).getStandardMinutes()
) )
); );
@ -1451,7 +1470,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase
null, null,
null, null,
INPUT_FORMAT, INPUT_FORMAT,
null null,
Duration.standardHours(2).getStandardMinutes()
) )
); );
@ -1497,7 +1517,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase
null, null,
null, null,
INPUT_FORMAT, INPUT_FORMAT,
null null,
Duration.standardHours(2).getStandardMinutes()
) )
); );
@ -1548,7 +1569,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase
null, null,
null, null,
INPUT_FORMAT, INPUT_FORMAT,
null null,
Duration.standardHours(2).getStandardMinutes()
) )
); );
@ -1586,7 +1608,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase
null, null,
null, null,
INPUT_FORMAT, INPUT_FORMAT,
null null,
Duration.standardHours(2).getStandardMinutes()
) )
); );
@ -1678,7 +1701,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase
null, null,
null, null,
INPUT_FORMAT, INPUT_FORMAT,
null null,
Duration.standardHours(2).getStandardMinutes()
) )
); );
@ -1744,7 +1768,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase
null, null,
null, null,
INPUT_FORMAT, INPUT_FORMAT,
null null,
Duration.standardHours(2).getStandardMinutes()
) )
); );
final KafkaIndexTask task2 = createTask( final KafkaIndexTask task2 = createTask(
@ -1760,7 +1785,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase
null, null,
null, null,
INPUT_FORMAT, INPUT_FORMAT,
null null,
Duration.standardHours(2).getStandardMinutes()
) )
); );
@ -1808,7 +1834,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase
null, null,
null, null,
INPUT_FORMAT, INPUT_FORMAT,
null null,
Duration.standardHours(2).getStandardMinutes()
) )
); );
final KafkaIndexTask task2 = createTask( final KafkaIndexTask task2 = createTask(
@ -1824,7 +1851,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase
null, null,
null, null,
INPUT_FORMAT, INPUT_FORMAT,
null null,
Duration.standardHours(2).getStandardMinutes()
) )
); );
@ -1874,7 +1902,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase
null, null,
null, null,
INPUT_FORMAT, INPUT_FORMAT,
null null,
Duration.standardHours(2).getStandardMinutes()
) )
); );
final KafkaIndexTask task2 = createTask( final KafkaIndexTask task2 = createTask(
@ -1890,7 +1919,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase
null, null,
null, null,
INPUT_FORMAT, INPUT_FORMAT,
null null,
Duration.standardHours(2).getStandardMinutes()
) )
); );
@ -1938,7 +1968,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase
null, null,
null, null,
INPUT_FORMAT, INPUT_FORMAT,
null null,
Duration.standardHours(2).getStandardMinutes()
) )
); );
@ -1984,7 +2015,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase
null, null,
null, null,
INPUT_FORMAT, INPUT_FORMAT,
null null,
Duration.standardHours(2).getStandardMinutes()
) )
); );
final KafkaIndexTask task2 = createTask( final KafkaIndexTask task2 = createTask(
@ -2000,7 +2032,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase
null, null,
null, null,
INPUT_FORMAT, INPUT_FORMAT,
null null,
Duration.standardHours(2).getStandardMinutes()
) )
); );
@ -2050,7 +2083,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase
null, null,
null, null,
INPUT_FORMAT, INPUT_FORMAT,
null null,
Duration.standardHours(2).getStandardMinutes()
) )
); );
@ -2084,7 +2118,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase
null, null,
null, null,
INPUT_FORMAT, INPUT_FORMAT,
null null,
Duration.standardHours(2).getStandardMinutes()
) )
); );
@ -2134,7 +2169,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase
null, null,
null, null,
INPUT_FORMAT, INPUT_FORMAT,
null null,
Duration.standardHours(2).getStandardMinutes()
) )
); );
@ -2177,7 +2213,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase
null, null,
null, null,
INPUT_FORMAT, INPUT_FORMAT,
null null,
Duration.standardHours(2).getStandardMinutes()
) )
); );
@ -2228,7 +2265,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase
null, null,
null, null,
INPUT_FORMAT, INPUT_FORMAT,
null null,
Duration.standardHours(2).getStandardMinutes()
) )
); );
@ -2299,7 +2337,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase
null, null,
null, null,
INPUT_FORMAT, INPUT_FORMAT,
null null,
Duration.standardHours(2).getStandardMinutes()
) )
); );
@ -2336,7 +2375,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase
null, null,
null, null,
INPUT_FORMAT, INPUT_FORMAT,
null null,
Duration.standardHours(2).getStandardMinutes()
) )
); );
@ -2383,7 +2423,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase
null, null,
null, null,
INPUT_FORMAT, INPUT_FORMAT,
null null,
Duration.standardHours(2).getStandardMinutes()
), ),
context context
); );
@ -2427,7 +2468,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase
null, null,
null, null,
INPUT_FORMAT, INPUT_FORMAT,
null null,
Duration.standardHours(2).getStandardMinutes()
) )
); );
@ -2466,7 +2508,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase
null, null,
null, null,
INPUT_FORMAT, INPUT_FORMAT,
null null,
Duration.standardHours(2).getStandardMinutes()
) )
); );
@ -2547,7 +2590,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase
null, null,
null, null,
INPUT_FORMAT, INPUT_FORMAT,
null null,
Duration.standardHours(2).getStandardMinutes()
) )
); );
@ -2607,7 +2651,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase
null, null,
null, null,
INPUT_FORMAT, INPUT_FORMAT,
null null,
Duration.standardHours(2).getStandardMinutes()
) )
); );
final ListenableFuture<TaskStatus> future = runTask(task); final ListenableFuture<TaskStatus> future = runTask(task);
@ -2630,7 +2675,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase
null, null,
null, null,
INPUT_FORMAT, INPUT_FORMAT,
null null,
Duration.standardHours(2).getStandardMinutes()
) )
); );
@ -2676,7 +2722,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase
null, null,
null, null,
INPUT_FORMAT, INPUT_FORMAT,
null null,
Duration.standardHours(2).getStandardMinutes()
) )
); );
@ -2708,7 +2755,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase
null, null,
null, null,
INPUT_FORMAT, INPUT_FORMAT,
null null,
Duration.standardHours(2).getStandardMinutes()
) )
); );
@ -2968,7 +3016,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase
null, null,
null, null,
INPUT_FORMAT, INPUT_FORMAT,
null null,
Duration.standardHours(2).getStandardMinutes()
) )
); );
@ -3029,7 +3078,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase
null, null,
null, null,
new TestKafkaFormatWithMalformedDataDetection(INPUT_FORMAT), new TestKafkaFormatWithMalformedDataDetection(INPUT_FORMAT),
null null,
Duration.standardHours(2).getStandardMinutes()
) )
); );
@ -3101,7 +3151,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase
null, null,
null, null,
new TestKafkaFormatWithMalformedDataDetection(INPUT_FORMAT), new TestKafkaFormatWithMalformedDataDetection(INPUT_FORMAT),
null null,
Duration.standardHours(2).getStandardMinutes()
) )
); );
@ -3175,7 +3226,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase
null, null,
null, null,
new TestKafkaFormatWithMalformedDataDetection(INPUT_FORMAT), new TestKafkaFormatWithMalformedDataDetection(INPUT_FORMAT),
null null,
Duration.standardHours(2).getStandardMinutes()
) )
); );
@ -3227,7 +3279,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase
null, null,
null, null,
INPUT_FORMAT, INPUT_FORMAT,
null null,
Duration.standardHours(2).getStandardMinutes()
) )
); );
@ -3281,7 +3334,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase
null, null,
null, null,
INPUT_FORMAT, INPUT_FORMAT,
null null,
Duration.standardHours(2).getStandardMinutes()
) )
); );

View File

@ -107,6 +107,7 @@ import org.easymock.EasyMock;
import org.easymock.EasyMockSupport; import org.easymock.EasyMockSupport;
import org.easymock.IAnswer; import org.easymock.IAnswer;
import org.joda.time.DateTime; import org.joda.time.DateTime;
import org.joda.time.Duration;
import org.joda.time.Period; import org.joda.time.Period;
import org.junit.After; import org.junit.After;
import org.junit.AfterClass; import org.junit.AfterClass;
@ -496,7 +497,8 @@ public class KafkaSupervisorTest extends EasyMockSupport
null, null,
null, null,
INPUT_FORMAT, INPUT_FORMAT,
null null,
Duration.standardHours(2).getStandardMinutes()
), ),
new KafkaIndexTaskTuningConfig( new KafkaIndexTaskTuningConfig(
null, null,
@ -5641,7 +5643,8 @@ public class KafkaSupervisorTest extends EasyMockSupport
minimumMessageTime, minimumMessageTime,
maximumMessageTime, maximumMessageTime,
INPUT_FORMAT, INPUT_FORMAT,
null null,
Duration.standardHours(2).getStandardMinutes()
), ),
Collections.emptyMap(), Collections.emptyMap(),
OBJECT_MAPPER OBJECT_MAPPER

View File

@ -78,7 +78,8 @@ public class KinesisIndexTaskIOConfig extends SeekableStreamIndexTaskIOConfig<St
@JsonProperty("endpoint") String endpoint, @JsonProperty("endpoint") String endpoint,
@JsonProperty("fetchDelayMillis") Integer fetchDelayMillis, @JsonProperty("fetchDelayMillis") Integer fetchDelayMillis,
@JsonProperty("awsAssumedRoleArn") String awsAssumedRoleArn, @JsonProperty("awsAssumedRoleArn") String awsAssumedRoleArn,
@JsonProperty("awsExternalId") String awsExternalId @JsonProperty("awsExternalId") String awsExternalId,
@JsonProperty("refreshRejectionPeriodsInMinutes") Long refreshRejectionPeriodsInMinutes
) )
{ {
super( super(
@ -89,7 +90,8 @@ public class KinesisIndexTaskIOConfig extends SeekableStreamIndexTaskIOConfig<St
useTransaction, useTransaction,
minimumMessageTime, minimumMessageTime,
maximumMessageTime, maximumMessageTime,
inputFormat inputFormat,
refreshRejectionPeriodsInMinutes
); );
Preconditions.checkArgument( Preconditions.checkArgument(
getEndSequenceNumbers().getPartitionSequenceNumberMap() getEndSequenceNumbers().getPartitionSequenceNumberMap()
@ -117,7 +119,8 @@ public class KinesisIndexTaskIOConfig extends SeekableStreamIndexTaskIOConfig<St
String endpoint, String endpoint,
Integer fetchDelayMillis, Integer fetchDelayMillis,
String awsAssumedRoleArn, String awsAssumedRoleArn,
String awsExternalId String awsExternalId,
Long refreshRejectionPeriodsInMinutes
) )
{ {
this( this(
@ -135,7 +138,8 @@ public class KinesisIndexTaskIOConfig extends SeekableStreamIndexTaskIOConfig<St
endpoint, endpoint,
fetchDelayMillis, fetchDelayMillis,
awsAssumedRoleArn, awsAssumedRoleArn,
awsExternalId awsExternalId,
refreshRejectionPeriodsInMinutes
); );
} }

View File

@ -145,7 +145,8 @@ public class KinesisSupervisor extends SeekableStreamSupervisor<String, String,
ioConfig.getEndpoint(), ioConfig.getEndpoint(),
ioConfig.getFetchDelayMillis(), ioConfig.getFetchDelayMillis(),
ioConfig.getAwsAssumedRoleArn(), ioConfig.getAwsAssumedRoleArn(),
ioConfig.getAwsExternalId() ioConfig.getAwsExternalId(),
ioConfig.getTaskDuration().getStandardMinutes()
); );
} }

View File

@ -35,6 +35,7 @@ import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.segment.indexing.IOConfig; import org.apache.druid.segment.indexing.IOConfig;
import org.hamcrest.CoreMatchers; import org.hamcrest.CoreMatchers;
import org.joda.time.DateTime; import org.joda.time.DateTime;
import org.joda.time.Duration;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Rule; import org.junit.Rule;
import org.junit.Test; import org.junit.Test;
@ -267,7 +268,8 @@ public class KinesisIOConfigTest
"endpoint", "endpoint",
2000, 2000,
"awsAssumedRoleArn", "awsAssumedRoleArn",
"awsExternalId" "awsExternalId",
Duration.standardHours(2).getStandardMinutes()
); );
final byte[] json = mapper.writeValueAsBytes(currentConfig); final byte[] json = mapper.writeValueAsBytes(currentConfig);

View File

@ -40,6 +40,7 @@ import org.apache.druid.server.security.Action;
import org.apache.druid.server.security.Resource; import org.apache.druid.server.security.Resource;
import org.apache.druid.server.security.ResourceAction; import org.apache.druid.server.security.ResourceAction;
import org.apache.druid.server.security.ResourceType; import org.apache.druid.server.security.ResourceType;
import org.joda.time.Duration;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Rule; import org.junit.Rule;
import org.junit.Test; import org.junit.Test;
@ -94,7 +95,8 @@ public class KinesisIndexTaskSerdeTest
"endpoint", "endpoint",
null, null,
null, null,
null null,
Duration.standardHours(2).getStandardMinutes()
); );
private static final String ACCESS_KEY = "test-access-key"; private static final String ACCESS_KEY = "test-access-key";
private static final String SECRET_KEY = "test-secret-key"; private static final String SECRET_KEY = "test-secret-key";

View File

@ -89,6 +89,7 @@ import org.apache.druid.segment.transform.ExpressionTransform;
import org.apache.druid.segment.transform.TransformSpec; import org.apache.druid.segment.transform.TransformSpec;
import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.DataSegment;
import org.easymock.EasyMock; import org.easymock.EasyMock;
import org.joda.time.Duration;
import org.joda.time.Period; import org.joda.time.Period;
import org.junit.After; import org.junit.After;
import org.junit.AfterClass; import org.junit.AfterClass;
@ -785,7 +786,8 @@ public class KinesisIndexTaskTest extends SeekableStreamIndexTaskTestBase
"awsEndpoint", "awsEndpoint",
null, null,
null, null,
null null,
Duration.standardHours(2).getStandardMinutes()
) )
); );
@ -847,7 +849,8 @@ public class KinesisIndexTaskTest extends SeekableStreamIndexTaskTestBase
"awsEndpoint", "awsEndpoint",
null, null,
null, null,
null null,
Duration.standardHours(2).getStandardMinutes()
) )
); );
@ -1946,7 +1949,8 @@ public class KinesisIndexTaskTest extends SeekableStreamIndexTaskTestBase
"awsEndpoint", "awsEndpoint",
null, null,
null, null,
null null,
Duration.standardHours(2).getStandardMinutes()
), ),
context context
); );
@ -2108,7 +2112,8 @@ public class KinesisIndexTaskTest extends SeekableStreamIndexTaskTestBase
"awsEndpoint", "awsEndpoint",
null, null,
null, null,
null null,
Duration.standardHours(2).getStandardMinutes()
), ),
context context
); );
@ -2309,7 +2314,8 @@ public class KinesisIndexTaskTest extends SeekableStreamIndexTaskTestBase
"awsEndpoint", "awsEndpoint",
null, null,
null, null,
null null,
Duration.standardHours(2).getStandardMinutes()
), ),
null null
); );

View File

@ -94,6 +94,7 @@ import org.easymock.EasyMock;
import org.easymock.EasyMockSupport; import org.easymock.EasyMockSupport;
import org.easymock.IAnswer; import org.easymock.IAnswer;
import org.joda.time.DateTime; import org.joda.time.DateTime;
import org.joda.time.Duration;
import org.joda.time.Period; import org.joda.time.Period;
import org.junit.After; import org.junit.After;
import org.junit.Assert; import org.junit.Assert;
@ -5563,7 +5564,8 @@ public class KinesisSupervisorTest extends EasyMockSupport
"awsEndpoint", "awsEndpoint",
null, null,
null, null,
null null,
Duration.standardHours(2).getStandardMinutes()
), ),
Collections.emptyMap(), Collections.emptyMap(),
false, false,

View File

@ -24,7 +24,6 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.common.base.Supplier; import com.google.common.base.Supplier;
import com.google.common.base.Suppliers; import com.google.common.base.Suppliers;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.impl.ByteEntity; import org.apache.druid.data.input.impl.ByteEntity;
import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.appenderator.ActionBasedPublishedSegmentRetriever; import org.apache.druid.indexing.appenderator.ActionBasedPublishedSegmentRetriever;
@ -244,32 +243,6 @@ public abstract class SeekableStreamIndexTask<PartitionIdType, SequenceOffsetTyp
); );
} }
public boolean withinMinMaxRecordTime(final InputRow row)
{
final boolean beforeMinimumMessageTime = ioConfig.getMinimumMessageTime().isPresent()
&& ioConfig.getMinimumMessageTime().get().isAfter(row.getTimestamp());
final boolean afterMaximumMessageTime = ioConfig.getMaximumMessageTime().isPresent()
&& ioConfig.getMaximumMessageTime().get().isBefore(row.getTimestamp());
if (log.isDebugEnabled()) {
if (beforeMinimumMessageTime) {
log.debug(
"CurrentTimeStamp[%s] is before MinimumMessageTime[%s]",
row.getTimestamp(),
ioConfig.getMinimumMessageTime().get()
);
} else if (afterMaximumMessageTime) {
log.debug(
"CurrentTimeStamp[%s] is after MaximumMessageTime[%s]",
row.getTimestamp(),
ioConfig.getMaximumMessageTime().get()
);
}
}
return !beforeMinimumMessageTime && !afterMaximumMessageTime;
}
@Override @Override
public String getTaskAllocatorId() public String getTaskAllocatorId()
{ {

View File

@ -42,6 +42,7 @@ public abstract class SeekableStreamIndexTaskIOConfig<PartitionIdType, SequenceO
private final Optional<DateTime> minimumMessageTime; private final Optional<DateTime> minimumMessageTime;
private final Optional<DateTime> maximumMessageTime; private final Optional<DateTime> maximumMessageTime;
private final InputFormat inputFormat; private final InputFormat inputFormat;
private final Long refreshRejectionPeriodsInMinutes;
public SeekableStreamIndexTaskIOConfig( public SeekableStreamIndexTaskIOConfig(
@Nullable final Integer taskGroupId, // can be null for backward compabitility @Nullable final Integer taskGroupId, // can be null for backward compabitility
@ -51,7 +52,8 @@ public abstract class SeekableStreamIndexTaskIOConfig<PartitionIdType, SequenceO
final Boolean useTransaction, final Boolean useTransaction,
final DateTime minimumMessageTime, final DateTime minimumMessageTime,
final DateTime maximumMessageTime, final DateTime maximumMessageTime,
@Nullable final InputFormat inputFormat @Nullable final InputFormat inputFormat,
@Nullable final Long refreshRejectionPeriodsInMinutes // can be null for backward compabitility
) )
{ {
this.taskGroupId = taskGroupId; this.taskGroupId = taskGroupId;
@ -62,6 +64,7 @@ public abstract class SeekableStreamIndexTaskIOConfig<PartitionIdType, SequenceO
this.minimumMessageTime = Optional.fromNullable(minimumMessageTime); this.minimumMessageTime = Optional.fromNullable(minimumMessageTime);
this.maximumMessageTime = Optional.fromNullable(maximumMessageTime); this.maximumMessageTime = Optional.fromNullable(maximumMessageTime);
this.inputFormat = inputFormat; this.inputFormat = inputFormat;
this.refreshRejectionPeriodsInMinutes = refreshRejectionPeriodsInMinutes;
Preconditions.checkArgument( Preconditions.checkArgument(
startSequenceNumbers.getStream().equals(endSequenceNumbers.getStream()), startSequenceNumbers.getStream().equals(endSequenceNumbers.getStream()),
@ -134,4 +137,11 @@ public abstract class SeekableStreamIndexTaskIOConfig<PartitionIdType, SequenceO
{ {
return inputFormat; return inputFormat;
} }
@Nullable
@JsonProperty
public Long getRefreshRejectionPeriodsInMinutes()
{
return refreshRejectionPeriodsInMinutes;
}
} }

View File

@ -76,6 +76,7 @@ import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervi
import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.metadata.PendingSegmentRecord; import org.apache.druid.metadata.PendingSegmentRecord;
import org.apache.druid.segment.incremental.ParseExceptionHandler; import org.apache.druid.segment.incremental.ParseExceptionHandler;
@ -246,6 +247,9 @@ public abstract class SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
private final Map<PartitionIdType, Long> partitionsThroughput = new HashMap<>(); private final Map<PartitionIdType, Long> partitionsThroughput = new HashMap<>();
private volatile DateTime minMessageTime;
private volatile DateTime maxMessageTime;
public SeekableStreamIndexTaskRunner( public SeekableStreamIndexTaskRunner(
final SeekableStreamIndexTask<PartitionIdType, SequenceOffsetType, RecordType> task, final SeekableStreamIndexTask<PartitionIdType, SequenceOffsetType, RecordType> task,
@Nullable final InputRowParser<ByteBuffer> parser, @Nullable final InputRowParser<ByteBuffer> parser,
@ -267,6 +271,18 @@ public abstract class SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
this.ingestionState = IngestionState.NOT_STARTED; this.ingestionState = IngestionState.NOT_STARTED;
this.lockGranularityToUse = lockGranularityToUse; this.lockGranularityToUse = lockGranularityToUse;
minMessageTime = ioConfig.getMinimumMessageTime().or(DateTimes.MIN);
maxMessageTime = ioConfig.getMaximumMessageTime().or(DateTimes.MAX);
if (ioConfig.getRefreshRejectionPeriodsInMinutes() != null) {
Execs.scheduledSingleThreaded("RejectionPeriodUpdater-Exec--%d")
.scheduleWithFixedDelay(
this::refreshMinMaxMessageTime,
ioConfig.getRefreshRejectionPeriodsInMinutes(),
ioConfig.getRefreshRejectionPeriodsInMinutes(),
TimeUnit.MINUTES
);
}
resetNextCheckpointTime(); resetNextCheckpointTime();
} }
@ -388,7 +404,7 @@ public abstract class SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
inputRowSchema, inputRowSchema,
task.getDataSchema().getTransformSpec(), task.getDataSchema().getTransformSpec(),
toolbox.getIndexingTmpDir(), toolbox.getIndexingTmpDir(),
row -> row != null && task.withinMinMaxRecordTime(row), row -> row != null && withinMinMaxRecordTime(row),
rowIngestionMeters, rowIngestionMeters,
parseExceptionHandler parseExceptionHandler
); );
@ -2092,4 +2108,35 @@ public abstract class SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
protected abstract boolean isEndOffsetExclusive(); protected abstract boolean isEndOffsetExclusive();
protected abstract TypeReference<List<SequenceMetadata<PartitionIdType, SequenceOffsetType>>> getSequenceMetadataTypeReference(); protected abstract TypeReference<List<SequenceMetadata<PartitionIdType, SequenceOffsetType>>> getSequenceMetadataTypeReference();
private void refreshMinMaxMessageTime()
{
minMessageTime = minMessageTime.plusMinutes(ioConfig.getRefreshRejectionPeriodsInMinutes().intValue());
maxMessageTime = maxMessageTime.plusMinutes(ioConfig.getRefreshRejectionPeriodsInMinutes().intValue());
log.info(StringUtils.format("Updated min and max messsage times to %s and %s respectively.", minMessageTime, maxMessageTime));
}
public boolean withinMinMaxRecordTime(final InputRow row)
{
final boolean beforeMinimumMessageTime = minMessageTime.isAfter(row.getTimestamp());
final boolean afterMaximumMessageTime = maxMessageTime.isBefore(row.getTimestamp());
if (log.isDebugEnabled()) {
if (beforeMinimumMessageTime) {
log.debug(
"CurrentTimeStamp[%s] is before MinimumMessageTime[%s]",
row.getTimestamp(),
minMessageTime
);
} else if (afterMaximumMessageTime) {
log.debug(
"CurrentTimeStamp[%s] is after MaximumMessageTime[%s]",
row.getTimestamp(),
maxMessageTime
);
}
}
return !beforeMinimumMessageTime && !afterMaximumMessageTime;
}
} }

View File

@ -44,6 +44,7 @@ import org.apache.druid.server.security.AuthorizerMapper;
import org.apache.druid.server.security.ForbiddenException; import org.apache.druid.server.security.ForbiddenException;
import org.apache.druid.server.security.ResourceType; import org.apache.druid.server.security.ResourceType;
import org.easymock.EasyMock; import org.easymock.EasyMock;
import org.joda.time.Duration;
import org.junit.Before; import org.junit.Before;
import org.junit.Rule; import org.junit.Rule;
import org.junit.Test; import org.junit.Test;
@ -380,7 +381,8 @@ public class SeekableStreamIndexTaskRunnerAuthTest
false, false,
DateTimes.nowUtc().minusDays(2), DateTimes.nowUtc().minusDays(2),
DateTimes.nowUtc(), DateTimes.nowUtc(),
new CsvInputFormat(null, null, true, null, 0, null) new CsvInputFormat(null, null, true, null, 0, null),
Duration.standardHours(2).getStandardMinutes()
); );
} }
} }

View File

@ -0,0 +1,242 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexing.seekablestream;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.InputRowParser;
import org.apache.druid.data.input.impl.JsonInputFormat;
import org.apache.druid.data.input.impl.StringDimensionSchema;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.indexing.common.LockGranularity;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.seekablestream.common.OrderedPartitionableRecord;
import org.apache.druid.indexing.seekablestream.common.OrderedSequenceNumber;
import org.apache.druid.indexing.seekablestream.common.RecordSupplier;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
import org.apache.druid.server.security.AuthorizerMapper;
import org.joda.time.DateTime;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;
import javax.annotation.Nullable;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
@RunWith(MockitoJUnitRunner.class)
public class SeekableStreamIndexTaskRunnerTest
{
@Mock
private InputRow row;
@Mock
private SeekableStreamIndexTask task;
@Test
public void testWithinMinMaxTime()
{
DimensionsSpec dimensionsSpec = new DimensionsSpec(
Arrays.asList(
new StringDimensionSchema("d1"),
new StringDimensionSchema("d2")
)
);
DataSchema schema =
DataSchema.builder()
.withDataSource("datasource")
.withTimestamp(new TimestampSpec(null, null, null))
.withDimensions(dimensionsSpec)
.withGranularity(
new UniformGranularitySpec(Granularities.MINUTE, Granularities.NONE, null)
)
.build();
SeekableStreamIndexTaskTuningConfig tuningConfig = Mockito.mock(SeekableStreamIndexTaskTuningConfig.class);
SeekableStreamIndexTaskIOConfig<String, String> ioConfig = Mockito.mock(SeekableStreamIndexTaskIOConfig.class);
SeekableStreamStartSequenceNumbers<String, String> sequenceNumbers = Mockito.mock(SeekableStreamStartSequenceNumbers.class);
SeekableStreamEndSequenceNumbers<String, String> endSequenceNumbers = Mockito.mock(SeekableStreamEndSequenceNumbers.class);
DateTime now = DateTimes.nowUtc();
Mockito.when(ioConfig.getRefreshRejectionPeriodsInMinutes()).thenReturn(120L);
Mockito.when(ioConfig.getMaximumMessageTime()).thenReturn(Optional.of(DateTimes.nowUtc().plusHours(2)));
Mockito.when(ioConfig.getMinimumMessageTime()).thenReturn(Optional.of(DateTimes.nowUtc().minusHours(2)));
Mockito.when(ioConfig.getInputFormat()).thenReturn(new JsonInputFormat(null, null, null, null, null));
Mockito.when(ioConfig.getStartSequenceNumbers()).thenReturn(sequenceNumbers);
Mockito.when(ioConfig.getEndSequenceNumbers()).thenReturn(endSequenceNumbers);
Mockito.when(endSequenceNumbers.getPartitionSequenceNumberMap()).thenReturn(ImmutableMap.of());
Mockito.when(sequenceNumbers.getStream()).thenReturn("test");
Mockito.when(task.getDataSchema()).thenReturn(schema);
Mockito.when(task.getIOConfig()).thenReturn(ioConfig);
Mockito.when(task.getTuningConfig()).thenReturn(tuningConfig);
TestasbleSeekableStreamIndexTaskRunner runner = new TestasbleSeekableStreamIndexTaskRunner(task, null, null, LockGranularity.TIME_CHUNK);
Mockito.when(row.getTimestamp()).thenReturn(now);
Assert.assertTrue(runner.withinMinMaxRecordTime(row));
Mockito.when(row.getTimestamp()).thenReturn(now.minusHours(2).minusMinutes(1));
Assert.assertFalse(runner.withinMinMaxRecordTime(row));
Mockito.when(row.getTimestamp()).thenReturn(now.plusHours(2).plusMinutes(1));
Assert.assertFalse(runner.withinMinMaxRecordTime(row));
}
@Test
public void testWithinMinMaxTimeNotPopulated()
{
DimensionsSpec dimensionsSpec = new DimensionsSpec(
Arrays.asList(
new StringDimensionSchema("d1"),
new StringDimensionSchema("d2")
)
);
DataSchema schema =
DataSchema.builder()
.withDataSource("datasource")
.withTimestamp(new TimestampSpec(null, null, null))
.withDimensions(dimensionsSpec)
.withGranularity(
new UniformGranularitySpec(Granularities.MINUTE, Granularities.NONE, null)
)
.build();
SeekableStreamIndexTaskTuningConfig tuningConfig = Mockito.mock(SeekableStreamIndexTaskTuningConfig.class);
SeekableStreamIndexTaskIOConfig<String, String> ioConfig = Mockito.mock(SeekableStreamIndexTaskIOConfig.class);
SeekableStreamStartSequenceNumbers<String, String> sequenceNumbers = Mockito.mock(SeekableStreamStartSequenceNumbers.class);
SeekableStreamEndSequenceNumbers<String, String> endSequenceNumbers = Mockito.mock(SeekableStreamEndSequenceNumbers.class);
DateTime now = DateTimes.nowUtc();
Mockito.when(ioConfig.getRefreshRejectionPeriodsInMinutes()).thenReturn(null);
// min max time not populated.
Mockito.when(ioConfig.getMaximumMessageTime()).thenReturn(Optional.absent());
Mockito.when(ioConfig.getMinimumMessageTime()).thenReturn(Optional.absent());
Mockito.when(ioConfig.getInputFormat()).thenReturn(new JsonInputFormat(null, null, null, null, null));
Mockito.when(ioConfig.getStartSequenceNumbers()).thenReturn(sequenceNumbers);
Mockito.when(ioConfig.getEndSequenceNumbers()).thenReturn(endSequenceNumbers);
Mockito.when(endSequenceNumbers.getPartitionSequenceNumberMap()).thenReturn(ImmutableMap.of());
Mockito.when(sequenceNumbers.getStream()).thenReturn("test");
Mockito.when(task.getDataSchema()).thenReturn(schema);
Mockito.when(task.getIOConfig()).thenReturn(ioConfig);
Mockito.when(task.getTuningConfig()).thenReturn(tuningConfig);
TestasbleSeekableStreamIndexTaskRunner runner = new TestasbleSeekableStreamIndexTaskRunner(task, null, null, LockGranularity.TIME_CHUNK);
Assert.assertTrue(runner.withinMinMaxRecordTime(row));
Mockito.when(row.getTimestamp()).thenReturn(now.minusHours(2).minusMinutes(1));
Assert.assertTrue(runner.withinMinMaxRecordTime(row));
Mockito.when(row.getTimestamp()).thenReturn(now.plusHours(2).plusMinutes(1));
Assert.assertTrue(runner.withinMinMaxRecordTime(row));
}
static class TestasbleSeekableStreamIndexTaskRunner extends SeekableStreamIndexTaskRunner
{
public TestasbleSeekableStreamIndexTaskRunner(
SeekableStreamIndexTask task,
@Nullable InputRowParser parser,
AuthorizerMapper authorizerMapper,
LockGranularity lockGranularityToUse
)
{
super(task, parser, authorizerMapper, lockGranularityToUse);
}
@Override
protected boolean isEndOfShard(Object seqNum)
{
return false;
}
@Nullable
@Override
protected TreeMap<Integer, Map> getCheckPointsFromContext(TaskToolbox toolbox, String checkpointsString)
{
return null;
}
@Override
protected Object getNextStartOffset(Object sequenceNumber)
{
return null;
}
@Override
protected SeekableStreamEndSequenceNumbers deserializePartitionsFromMetadata(ObjectMapper mapper, Object object)
{
return null;
}
@Override
protected List<OrderedPartitionableRecord> getRecords(RecordSupplier recordSupplier, TaskToolbox toolbox)
{
return null;
}
@Override
protected SeekableStreamDataSourceMetadata createDataSourceMetadata(SeekableStreamSequenceNumbers partitions)
{
return null;
}
@Override
protected OrderedSequenceNumber createSequenceNumber(Object sequenceNumber)
{
return null;
}
@Override
protected boolean isEndOffsetExclusive()
{
return false;
}
@Override
protected TypeReference<List<SequenceMetadata>> getSequenceMetadataTypeReference()
{
return null;
}
@Override
protected void possiblyResetDataSourceMetadata(TaskToolbox toolbox, RecordSupplier recordSupplier, Set assignment)
{
}
}
}

View File

@ -196,7 +196,8 @@ public class SeekableStreamSupervisorSpecTest extends EasyMockSupport
true, true,
minimumMessageTime, minimumMessageTime,
maximumMessageTime, maximumMessageTime,
ioConfig.getInputFormat() ioConfig.getInputFormat(),
ioConfig.getTaskDuration().getStandardMinutes()
) )
{ {
}; };

View File

@ -2807,7 +2807,8 @@ public class SeekableStreamSupervisorStateTest extends EasyMockSupport
true, true,
minimumMessageTime, minimumMessageTime,
maximumMessageTime, maximumMessageTime,
ioConfig.getInputFormat() ioConfig.getInputFormat(),
ioConfig.getTaskDuration().getStandardMinutes()
) )
{ {
}; };
@ -3166,7 +3167,8 @@ public class SeekableStreamSupervisorStateTest extends EasyMockSupport
true, true,
minimumMessageTime, minimumMessageTime,
maximumMessageTime, maximumMessageTime,
ioConfig.getInputFormat() ioConfig.getInputFormat(),
ioConfig.getTaskDuration().getStandardMinutes()
) )
{ {
}; };