time based checkpointing for Kafka Indexing Service (#5255)

* time based checkpointing

* add test and fix issue

* fix comments

* fix formatting

* update docs
This commit is contained in:
Parag Jain 2018-02-15 22:57:02 -06:00 committed by Slim
parent 20a3164180
commit fba13d8978
8 changed files with 135 additions and 26 deletions

View File

@ -116,7 +116,7 @@ The tuningConfig is optional and default parameters will be used if no tuningCon
|-----|----|-----------|--------|
|`type`|String|The indexing task type, this should always be `kafka`.|yes|
|`maxRowsInMemory`|Integer|The number of rows to aggregate before persisting. This number is the post-aggregation rows, so it is not equivalent to the number of input events, but the number of aggregated rows that those events result in. This is used to manage the required JVM heap size. Maximum heap memory usage for indexing scales with maxRowsInMemory * (2 + maxPendingPersists).|no (default == 75000)|
|`maxRowsPerSegment`|Integer|The number of rows to aggregate into a segment; this number is post-aggregation rows.|no (default == 5000000)|
|`maxRowsPerSegment`|Integer|The number of rows to aggregate into a segment; this number is post-aggregation rows. Handoff will happen either if `maxRowsPerSegment` is hit or every `intermediateHandoffPeriod`, whichever happens earlier.|no (default == 5000000)|
|`intermediatePersistPeriod`|ISO8601 Period|The period that determines the rate at which intermediate persists occur.|no (default == PT10M)|
|`maxPendingPersists`|Integer|Maximum number of persists that can be pending but not started. If this limit would be exceeded by a new intermediate persist, ingestion will block until the currently-running persist finishes. Maximum heap memory usage for indexing scales with maxRowsInMemory * (2 + maxPendingPersists).|no (default == 0, meaning one persist can be running concurrently with ingestion, and none can be queued up)|
|`indexSpec`|Object|Tune how data is indexed, see 'IndexSpec' below for more details.|no|
@ -130,6 +130,7 @@ The tuningConfig is optional and default parameters will be used if no tuningCon
|`shutdownTimeout`|ISO8601 Period|How long to wait for the supervisor to attempt a graceful shutdown of tasks before exiting.|no (default == PT80S)|
|`offsetFetchPeriod`|ISO8601 Period|How often the supervisor queries Kafka and the indexing tasks to fetch current offsets and calculate lag.|no (default == PT30S, min == PT5S)|
|`segmentWriteOutMediumFactory`|String|Segment write-out medium to use when creating segments. See [Indexing Service Configuration](../configuration/indexing-service.html) page, "SegmentWriteOutMediumFactory" section for explanation and available options.|no (not specified by default, the value from `druid.peon.defaultSegmentWriteOutMediumFactory` is used)|
|`intermediateHandoffPeriod`|ISO8601 Period|How often the tasks should hand off segments. Handoff will happen either if `maxRowsPerSegment` is hit or every `intermediateHandoffPeriod`, whichever happens earlier.|no (default == P2147483647D)|
#### IndexSpec
@ -312,12 +313,12 @@ In this way, configuration changes can be applied without requiring any pause in
### On the Subject of Segments
Each Kafka Indexing Task puts events consumed from Kafka partitions assigned to it in a single segment for each segment
granular interval until maxRowsPerSegment limit is reached, at this point a new partition for this segment granularity is
created for further events. Kafka Indexing Task also does incremental hand-offs which means that all the segments created by a
task will not be held up till the task duration is over. As soon as maxRowsPerSegment limit is hit, all the segments held
by the task at that point in time will be handed-off and new set of segments will be created for further events.
This means that the task can run for longer durations of time without accumulating old segments locally on Middle Manager
nodes and it is encouraged to do so.
granular interval until maxRowsPerSegment or intermediateHandoffPeriod limit is reached, at this point a new partition
for this segment granularity is created for further events. Kafka Indexing Task also does incremental hand-offs which
means that all the segments created by a task will not be held up till the task duration is over. As soon as maxRowsPerSegment
or intermediateHandoffPeriod limit is hit, all the segments held by the task at that point in time will be handed-off
and new set of segments will be created for further events. This means that the task can run for longer durations of time
without accumulating old segments locally on Middle Manager nodes and it is encouraged to do so.
Kafka Indexing Service may still produce some small segments. Lets say the task duration is 4 hours, segment granularity
is set to an HOUR and Supervisor was started at 9:10 then after 4 hours at 13:10, new set of tasks will be started and

View File

@ -179,7 +179,6 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler
private final Map<Integer, Long> endOffsets = new ConcurrentHashMap<>();
private final Map<Integer, Long> nextOffsets = new ConcurrentHashMap<>();
private final Map<Integer, Long> maxEndOffsets = new HashMap<>();
private final Map<Integer, Long> lastPersistedOffsets = new ConcurrentHashMap<>();
private TaskToolbox toolbox;
@ -231,6 +230,7 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler
private volatile boolean pauseRequested = false;
private volatile long pauseMillis = 0;
private volatile long nextCheckpointTime;
// This value can be tuned in some tests
private long pollRetryMs = 30000;
@ -273,12 +273,6 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler
this.chatHandlerProvider = Optional.fromNullable(chatHandlerProvider);
this.authorizerMapper = authorizerMapper;
this.endOffsets.putAll(ioConfig.getEndPartitions().getPartitionOffsetMap());
this.maxEndOffsets.putAll(endOffsets.entrySet()
.stream()
.collect(Collectors.toMap(
Map.Entry::getKey,
integerLongEntry -> Long.MAX_VALUE
)));
this.topic = ioConfig.getStartPartitions().getTopic();
this.sequences = new CopyOnWriteArrayList<>();
@ -288,6 +282,12 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler
} else {
useLegacy = true;
}
resetNextCheckpointTime();
}
private void resetNextCheckpointTime()
{
nextCheckpointTime = DateTimes.nowUtc().plus(tuningConfig.getIntermediateHandoffPeriod()).getMillis();
}
@VisibleForTesting
@ -444,7 +444,7 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler
previous.getKey(),
StringUtils.format("%s_%s", ioConfig.getBaseSequenceName(), previous.getKey()),
previous.getValue(),
maxEndOffsets,
endOffsets,
false
));
} else {
@ -452,7 +452,7 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler
0,
StringUtils.format("%s_%s", ioConfig.getBaseSequenceName(), 0),
ioConfig.getStartPartitions().getPartitionOffsetMap(),
maxEndOffsets,
endOffsets,
false
));
}
@ -775,7 +775,11 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler
}
}
if (sequenceToCheckpoint != null && !ioConfig.isPauseAfterRead()) {
if (System.currentTimeMillis() > nextCheckpointTime) {
sequenceToCheckpoint = sequences.get(sequences.size() - 1);
}
if (sequenceToCheckpoint != null && stillReading) {
Preconditions.checkArgument(
sequences.get(sequences.size() - 1)
.getSequenceName()
@ -1547,6 +1551,7 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler
}
}
resetNextCheckpointTime();
latestSequence.setEndOffsets(offsets);
if (finish) {
@ -1559,7 +1564,7 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler
latestSequence.getSequenceId() + 1,
StringUtils.format("%s_%d", ioConfig.getBaseSequenceName(), latestSequence.getSequenceId() + 1),
offsets,
maxEndOffsets,
endOffsets,
false
);
sequences.add(newSequence);

View File

@ -49,6 +49,7 @@ public class KafkaTuningConfig implements TuningConfig, AppenderatorConfig
private final boolean resetOffsetAutomatically;
@Nullable
private final SegmentWriteOutMediumFactory segmentWriteOutMediumFactory;
private final Period intermediateHandoffPeriod;
@JsonCreator
public KafkaTuningConfig(
@ -63,7 +64,8 @@ public class KafkaTuningConfig implements TuningConfig, AppenderatorConfig
@JsonProperty("reportParseExceptions") @Nullable Boolean reportParseExceptions,
@JsonProperty("handoffConditionTimeout") @Nullable Long handoffConditionTimeout,
@JsonProperty("resetOffsetAutomatically") @Nullable Boolean resetOffsetAutomatically,
@JsonProperty("segmentWriteOutMediumFactory") @Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory
@JsonProperty("segmentWriteOutMediumFactory") @Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory,
@JsonProperty("intermediateHandoffPeriod") @Nullable Period intermediateHandoffPeriod
)
{
// Cannot be a static because default basePersistDirectory is unique per-instance
@ -87,6 +89,9 @@ public class KafkaTuningConfig implements TuningConfig, AppenderatorConfig
? DEFAULT_RESET_OFFSET_AUTOMATICALLY
: resetOffsetAutomatically;
this.segmentWriteOutMediumFactory = segmentWriteOutMediumFactory;
this.intermediateHandoffPeriod = intermediateHandoffPeriod == null
? new Period().withDays(Integer.MAX_VALUE)
: intermediateHandoffPeriod;
}
public static KafkaTuningConfig copyOf(KafkaTuningConfig config)
@ -102,7 +107,8 @@ public class KafkaTuningConfig implements TuningConfig, AppenderatorConfig
config.reportParseExceptions,
config.handoffConditionTimeout,
config.resetOffsetAutomatically,
config.segmentWriteOutMediumFactory
config.segmentWriteOutMediumFactory,
config.intermediateHandoffPeriod
);
}
@ -185,6 +191,12 @@ public class KafkaTuningConfig implements TuningConfig, AppenderatorConfig
return segmentWriteOutMediumFactory;
}
@JsonProperty
public Period getIntermediateHandoffPeriod()
{
return intermediateHandoffPeriod;
}
public KafkaTuningConfig withBasePersistDirectory(File dir)
{
return new KafkaTuningConfig(
@ -198,7 +210,8 @@ public class KafkaTuningConfig implements TuningConfig, AppenderatorConfig
reportParseExceptions,
handoffConditionTimeout,
resetOffsetAutomatically,
segmentWriteOutMediumFactory
segmentWriteOutMediumFactory,
intermediateHandoffPeriod
);
}
@ -221,7 +234,8 @@ public class KafkaTuningConfig implements TuningConfig, AppenderatorConfig
Objects.equals(intermediatePersistPeriod, that.intermediatePersistPeriod) &&
Objects.equals(basePersistDirectory, that.basePersistDirectory) &&
Objects.equals(indexSpec, that.indexSpec) &&
Objects.equals(segmentWriteOutMediumFactory, that.segmentWriteOutMediumFactory);
Objects.equals(segmentWriteOutMediumFactory, that.segmentWriteOutMediumFactory) &&
Objects.equals(intermediateHandoffPeriod, that.intermediateHandoffPeriod);
}
@Override
@ -237,7 +251,8 @@ public class KafkaTuningConfig implements TuningConfig, AppenderatorConfig
reportParseExceptions,
handoffConditionTimeout,
resetOffsetAutomatically,
segmentWriteOutMediumFactory
segmentWriteOutMediumFactory,
intermediateHandoffPeriod
);
}
@ -255,6 +270,7 @@ public class KafkaTuningConfig implements TuningConfig, AppenderatorConfig
", handoffConditionTimeout=" + handoffConditionTimeout +
", resetOffsetAutomatically=" + resetOffsetAutomatically +
", segmentWriteOutMediumFactory=" + segmentWriteOutMediumFactory +
", intermediateHandoffPeriod=" + intermediateHandoffPeriod +
'}';
}
}

View File

@ -89,6 +89,7 @@ public class KafkaSupervisorSpec implements SupervisorSpec
null,
null,
null,
null,
null
);
this.ioConfig = Preconditions.checkNotNull(ioConfig, "ioConfig");

View File

@ -56,7 +56,8 @@ public class KafkaSupervisorTuningConfig extends KafkaTuningConfig
@JsonProperty("chatRetries") Long chatRetries,
@JsonProperty("httpTimeout") Period httpTimeout,
@JsonProperty("shutdownTimeout") Period shutdownTimeout,
@JsonProperty("offsetFetchPeriod") Period offsetFetchPeriod
@JsonProperty("offsetFetchPeriod") Period offsetFetchPeriod,
@JsonProperty("intermediateHandoffPeriod") Period intermediateHandoffPeriod
)
{
super(
@ -70,7 +71,8 @@ public class KafkaSupervisorTuningConfig extends KafkaTuningConfig
reportParseExceptions,
handoffConditionTimeout,
resetOffsetAutomatically,
segmentWriteOutMediumFactory
segmentWriteOutMediumFactory,
intermediateHandoffPeriod
);
this.workerThreads = workerThreads;
@ -137,6 +139,7 @@ public class KafkaSupervisorTuningConfig extends KafkaTuningConfig
", httpTimeout=" + httpTimeout +
", shutdownTimeout=" + shutdownTimeout +
", offsetFetchPeriod=" + offsetFetchPeriod +
", intermediateHandoffPeriod=" + getIntermediateHandoffPeriod() +
'}';
}

View File

@ -186,6 +186,7 @@ public class KafkaIndexTaskTest
private boolean resetOffsetAutomatically = false;
private boolean doHandoff = true;
private Integer maxRowsPerSegment = null;
private Period intermediateHandoffPeriod = null;
private TaskToolboxFactory toolboxFactory;
private IndexerMetadataStorageCoordinator metadataStorageCoordinator;
@ -524,6 +525,84 @@ public class KafkaIndexTaskTest
Assert.assertEquals(ImmutableList.of("f"), readSegmentColumn("dim1", desc7));
}
@Test(timeout = 60_000L)
public void testTimeBasedIncrementalHandOff() throws Exception
{
if (!isIncrementalHandoffSupported) {
return;
}
final String baseSequenceName = "sequence0";
// as soon as any segment hits maxRowsPerSegment or intermediateHandoffPeriod, incremental publishing should happen
maxRowsPerSegment = Integer.MAX_VALUE;
intermediateHandoffPeriod = new Period().withSeconds(0);
// Insert data
try (final KafkaProducer<byte[], byte[]> kafkaProducer = kafkaServer.newProducer()) {
for (ProducerRecord<byte[], byte[]> record : records.subList(0, 2)) {
kafkaProducer.send(record).get();
}
}
Map<String, String> consumerProps = kafkaServer.consumerProperties();
consumerProps.put("max.poll.records", "1");
final KafkaPartitions startPartitions = new KafkaPartitions(topic, ImmutableMap.of(0, 0L, 1, 0L));
// Checkpointing will happen at checkpoint
final KafkaPartitions checkpoint = new KafkaPartitions(topic, ImmutableMap.of(0, 1L, 1, 0L));
final KafkaPartitions endPartitions = new KafkaPartitions(topic, ImmutableMap.of(0, 2L, 1, 0L));
final KafkaIndexTask task = createTask(
null,
new KafkaIOConfig(
baseSequenceName,
startPartitions,
endPartitions,
consumerProps,
true,
false,
null,
null,
false
)
);
final ListenableFuture<TaskStatus> future = runTask(task);
// task will pause for checkpointing
while (task.getStatus() != KafkaIndexTask.Status.PAUSED) {
Thread.sleep(10);
}
final Map<Integer, Long> currentOffsets = ImmutableMap.copyOf(task.getCurrentOffsets());
Assert.assertTrue(checkpoint.getPartitionOffsetMap().equals(currentOffsets));
task.setEndOffsets(currentOffsets, true, false);
Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode());
Assert.assertEquals(1, checkpointRequestsHash.size());
Assert.assertTrue(checkpointRequestsHash.contains(
Objects.hash(
DATA_SCHEMA.getDataSource(),
baseSequenceName,
new KafkaDataSourceMetadata(startPartitions),
new KafkaDataSourceMetadata(new KafkaPartitions(topic, checkpoint.getPartitionOffsetMap()))
)
));
// Check metrics
Assert.assertEquals(2, task.getFireDepartmentMetrics().processed());
Assert.assertEquals(0, task.getFireDepartmentMetrics().unparseable());
Assert.assertEquals(0, task.getFireDepartmentMetrics().thrownAway());
// Check published metadata
SegmentDescriptor desc1 = SD(task, "2008/P1D", 0);
SegmentDescriptor desc2 = SD(task, "2009/P1D", 0);
Assert.assertEquals(ImmutableSet.of(desc1, desc2), publishedDescriptors());
Assert.assertEquals(
new KafkaDataSourceMetadata(new KafkaPartitions(topic, ImmutableMap.of(0, 2L, 1, 0L))),
metadataStorageCoordinator.getDataSourceMetadata(DATA_SCHEMA.getDataSource())
);
// Check segments in deep storage
Assert.assertEquals(ImmutableList.of("a"), readSegmentColumn("dim1", desc1));
Assert.assertEquals(ImmutableList.of("b"), readSegmentColumn("dim1", desc2));
}
@Test(timeout = 60_000L)
public void testRunWithMinimumMessageTime() throws Exception
{
@ -1708,7 +1787,8 @@ public class KafkaIndexTaskTest
reportParseExceptions,
handoffConditionTimeout,
resetOffsetAutomatically,
null
null,
intermediateHandoffPeriod
);
final Map<String, Object> context = isIncrementalHandoffSupported
? ImmutableMap.of(KafkaSupervisor.IS_INCREMENTAL_HANDOFF_SUPPORTED, true)
@ -1746,6 +1826,7 @@ public class KafkaIndexTaskTest
reportParseExceptions,
handoffConditionTimeout,
resetOffsetAutomatically,
null,
null
);
if (isIncrementalHandoffSupported) {

View File

@ -112,6 +112,7 @@ public class KafkaTuningConfigTest
true,
5L,
null,
null,
null
);
KafkaTuningConfig copy = KafkaTuningConfig.copyOf(original);

View File

@ -200,6 +200,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
TEST_CHAT_RETRIES,
TEST_HTTP_TIMEOUT,
TEST_SHUTDOWN_TIMEOUT,
null,
null
);