Support Kafka supervisor adopting running tasks between versions (#7212)

* Recompute hash in isTaskCurrent() and added tests

* Fixed checkstyle stuff

* Fixed failing tests

* Make TestableKafkaSupervisorWithCustomIsTaskCurrent static

* Add doc

* baseSequenceName change

* Added comment

* WIP

* Fixed imports

* Undid lambda change for diff sake

* Cleanup

* Added comment

* Reinsert Kafka tests

* Readded kinesis test

* Readd bad partition assignment in kinesis supervisor test

* Nit

* Misnamed var
This commit is contained in:
Justin Borromeo 2019-04-10 18:16:38 -07:00 committed by Jonathan Wei
parent 2f64414ade
commit 2771ed50b0
9 changed files with 2171 additions and 460 deletions

View File

@ -22,6 +22,7 @@ package org.apache.druid.indexing.kafka;
import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorTuningConfig;
import org.apache.druid.indexing.kafka.test.TestModifiedKafkaIndexTaskTuningConfig;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.indexing.TuningConfig;
@ -30,6 +31,7 @@ import org.junit.Assert;
import org.junit.Test;
import java.io.File;
import java.io.IOException;
public class KafkaIndexTaskTuningConfigTest
{
@ -145,6 +147,100 @@ public class KafkaIndexTaskTuningConfigTest
Assert.assertEquals(5L, copy.getHandoffConditionTimeout());
}
@Test
public void testSerdeWithModifiedTuningConfigAddedField() throws IOException
{
KafkaIndexTaskTuningConfig base = new KafkaIndexTaskTuningConfig(
1,
null,
2,
10L,
new Period("PT3S"),
new File("/tmp/xxx"),
4,
new IndexSpec(),
true,
true,
5L,
null,
null,
null,
true,
42,
42
);
String serialized = mapper.writeValueAsString(base);
TestModifiedKafkaIndexTaskTuningConfig deserialized =
mapper.readValue(serialized, TestModifiedKafkaIndexTaskTuningConfig.class);
Assert.assertEquals(null, deserialized.getExtra());
Assert.assertEquals(base.getMaxRowsInMemory(), deserialized.getMaxRowsInMemory());
Assert.assertEquals(base.getMaxBytesInMemory(), deserialized.getMaxBytesInMemory());
Assert.assertEquals(base.getMaxRowsPerSegment(), deserialized.getMaxRowsPerSegment());
Assert.assertEquals(base.getMaxTotalRows(), deserialized.getMaxTotalRows());
Assert.assertEquals(base.getIntermediatePersistPeriod(), deserialized.getIntermediatePersistPeriod());
Assert.assertEquals(base.getBasePersistDirectory(), deserialized.getBasePersistDirectory());
Assert.assertEquals(base.getMaxPendingPersists(), deserialized.getMaxPendingPersists());
Assert.assertEquals(base.getIndexSpec(), deserialized.getIndexSpec());
Assert.assertEquals(base.getBuildV9Directly(), deserialized.getBuildV9Directly());
Assert.assertEquals(base.isReportParseExceptions(), deserialized.isReportParseExceptions());
Assert.assertEquals(base.getHandoffConditionTimeout(), deserialized.getHandoffConditionTimeout());
Assert.assertEquals(base.isResetOffsetAutomatically(), deserialized.isResetOffsetAutomatically());
Assert.assertEquals(base.getSegmentWriteOutMediumFactory(), deserialized.getSegmentWriteOutMediumFactory());
Assert.assertEquals(base.getIntermediateHandoffPeriod(), deserialized.getIntermediateHandoffPeriod());
Assert.assertEquals(base.isLogParseExceptions(), deserialized.isLogParseExceptions());
Assert.assertEquals(base.getMaxParseExceptions(), deserialized.getMaxParseExceptions());
Assert.assertEquals(base.getMaxSavedParseExceptions(), deserialized.getMaxSavedParseExceptions());
}
@Test
public void testSerdeWithModifiedTuningConfigRemovedField() throws IOException
{
TestModifiedKafkaIndexTaskTuningConfig base = new TestModifiedKafkaIndexTaskTuningConfig(
1,
null,
2,
10L,
new Period("PT3S"),
new File("/tmp/xxx"),
4,
new IndexSpec(),
true,
true,
5L,
null,
null,
null,
true,
42,
42,
"extra string"
);
String serialized = mapper.writeValueAsString(base);
KafkaIndexTaskTuningConfig deserialized =
mapper.readValue(serialized, KafkaIndexTaskTuningConfig.class);
Assert.assertEquals(base.getMaxRowsInMemory(), deserialized.getMaxRowsInMemory());
Assert.assertEquals(base.getMaxBytesInMemory(), deserialized.getMaxBytesInMemory());
Assert.assertEquals(base.getMaxRowsPerSegment(), deserialized.getMaxRowsPerSegment());
Assert.assertEquals(base.getMaxTotalRows(), deserialized.getMaxTotalRows());
Assert.assertEquals(base.getIntermediatePersistPeriod(), deserialized.getIntermediatePersistPeriod());
Assert.assertEquals(base.getBasePersistDirectory(), deserialized.getBasePersistDirectory());
Assert.assertEquals(base.getMaxPendingPersists(), deserialized.getMaxPendingPersists());
Assert.assertEquals(base.getIndexSpec(), deserialized.getIndexSpec());
Assert.assertEquals(base.getBuildV9Directly(), deserialized.getBuildV9Directly());
Assert.assertEquals(base.isReportParseExceptions(), deserialized.isReportParseExceptions());
Assert.assertEquals(base.getHandoffConditionTimeout(), deserialized.getHandoffConditionTimeout());
Assert.assertEquals(base.isResetOffsetAutomatically(), deserialized.isResetOffsetAutomatically());
Assert.assertEquals(base.getSegmentWriteOutMediumFactory(), deserialized.getSegmentWriteOutMediumFactory());
Assert.assertEquals(base.getIntermediateHandoffPeriod(), deserialized.getIntermediateHandoffPeriod());
Assert.assertEquals(base.isLogParseExceptions(), deserialized.isLogParseExceptions());
Assert.assertEquals(base.getMaxParseExceptions(), deserialized.getMaxParseExceptions());
Assert.assertEquals(base.getMaxSavedParseExceptions(), deserialized.getMaxSavedParseExceptions());
}
private static KafkaIndexTaskTuningConfig copy(KafkaIndexTaskTuningConfig config)
{
return new KafkaIndexTaskTuningConfig(

View File

@ -0,0 +1,88 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexing.kafka.test;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import org.apache.druid.indexing.kafka.KafkaIndexTaskTuningConfig;
import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory;
import org.joda.time.Period;
import javax.annotation.Nullable;
import java.io.File;
@JsonTypeName("KafkaTuningConfig")
public class TestModifiedKafkaIndexTaskTuningConfig extends KafkaIndexTaskTuningConfig
{
private final String extra;
@JsonCreator
public TestModifiedKafkaIndexTaskTuningConfig(
@JsonProperty("maxRowsInMemory") @Nullable Integer maxRowsInMemory,
@JsonProperty("maxBytesInMemory") @Nullable Long maxBytesInMemory,
@JsonProperty("maxRowsPerSegment") @Nullable Integer maxRowsPerSegment,
@JsonProperty("maxTotalRows") @Nullable Long maxTotalRows,
@JsonProperty("intermediatePersistPeriod") @Nullable Period intermediatePersistPeriod,
@JsonProperty("basePersistDirectory") @Nullable File basePersistDirectory,
@JsonProperty("maxPendingPersists") @Nullable Integer maxPendingPersists,
@JsonProperty("indexSpec") @Nullable IndexSpec indexSpec,
// This parameter is left for compatibility when reading existing configs, to be removed in Druid 0.12.
@JsonProperty("buildV9Directly") @Nullable Boolean buildV9Directly,
@Deprecated @JsonProperty("reportParseExceptions") @Nullable Boolean reportParseExceptions,
@JsonProperty("handoffConditionTimeout") @Nullable Long handoffConditionTimeout,
@JsonProperty("resetOffsetAutomatically") @Nullable Boolean resetOffsetAutomatically,
@JsonProperty("segmentWriteOutMediumFactory") @Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory,
@JsonProperty("intermediateHandoffPeriod") @Nullable Period intermediateHandoffPeriod,
@JsonProperty("logParseExceptions") @Nullable Boolean logParseExceptions,
@JsonProperty("maxParseExceptions") @Nullable Integer maxParseExceptions,
@JsonProperty("maxSavedParseExceptions") @Nullable Integer maxSavedParseExceptions,
@JsonProperty("extra") String extra
)
{
super(
maxRowsInMemory,
maxBytesInMemory,
maxRowsPerSegment,
maxTotalRows,
intermediatePersistPeriod,
basePersistDirectory,
maxPendingPersists,
indexSpec,
true,
reportParseExceptions,
handoffConditionTimeout,
resetOffsetAutomatically,
segmentWriteOutMediumFactory,
intermediateHandoffPeriod,
logParseExceptions,
maxParseExceptions,
maxSavedParseExceptions
);
this.extra = extra;
}
@JsonProperty("extra")
public String getExtra()
{
return extra;
}
}

View File

@ -23,6 +23,7 @@ import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.druid.indexing.kinesis.supervisor.KinesisSupervisorTuningConfig;
import org.apache.druid.indexing.kinesis.test.TestModifiedKinesisIndexTaskTuningConfig;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.indexing.TuningConfig;
@ -34,6 +35,7 @@ import org.junit.Test;
import org.junit.rules.ExpectedException;
import java.io.File;
import java.io.IOException;
public class KinesisIndexTaskTuningConfigTest
{
@ -130,6 +132,121 @@ public class KinesisIndexTaskTuningConfigTest
Assert.assertFalse(config.isResetOffsetAutomatically());
}
@Test
public void testSerdeWithModifiedTuningConfigAddedField() throws IOException
{
KinesisIndexTaskTuningConfig base = new KinesisIndexTaskTuningConfig(
1,
3L,
2,
100L,
new Period("PT3S"),
new File("/tmp/xxx"),
4,
new IndexSpec(),
true,
true,
5L,
true,
false,
1000,
1000,
500,
null,
42,
null,
false,
500,
500,
6000,
new Period("P3D")
);
String serialized = mapper.writeValueAsString(base);
TestModifiedKinesisIndexTaskTuningConfig deserialized =
mapper.readValue(serialized, TestModifiedKinesisIndexTaskTuningConfig.class);
Assert.assertEquals(null, deserialized.getExtra());
Assert.assertEquals(base.getMaxRowsInMemory(), deserialized.getMaxRowsInMemory());
Assert.assertEquals(base.getMaxBytesInMemory(), deserialized.getMaxBytesInMemory());
Assert.assertEquals(base.getMaxRowsPerSegment(), deserialized.getMaxRowsPerSegment());
Assert.assertEquals(base.getMaxTotalRows(), deserialized.getMaxTotalRows());
Assert.assertEquals(base.getIntermediatePersistPeriod(), deserialized.getIntermediatePersistPeriod());
Assert.assertEquals(base.getBasePersistDirectory(), deserialized.getBasePersistDirectory());
Assert.assertEquals(base.getMaxPendingPersists(), deserialized.getMaxPendingPersists());
Assert.assertEquals(base.getIndexSpec(), deserialized.getIndexSpec());
Assert.assertEquals(base.getBuildV9Directly(), deserialized.getBuildV9Directly());
Assert.assertEquals(base.isReportParseExceptions(), deserialized.isReportParseExceptions());
Assert.assertEquals(base.getHandoffConditionTimeout(), deserialized.getHandoffConditionTimeout());
Assert.assertEquals(base.isResetOffsetAutomatically(), deserialized.isResetOffsetAutomatically());
Assert.assertEquals(base.getSegmentWriteOutMediumFactory(), deserialized.getSegmentWriteOutMediumFactory());
Assert.assertEquals(base.getIntermediateHandoffPeriod(), deserialized.getIntermediateHandoffPeriod());
Assert.assertEquals(base.isLogParseExceptions(), deserialized.isLogParseExceptions());
Assert.assertEquals(base.getMaxParseExceptions(), deserialized.getMaxParseExceptions());
Assert.assertEquals(base.getMaxSavedParseExceptions(), deserialized.getMaxSavedParseExceptions());
Assert.assertEquals(base.getRecordBufferFullWait(), deserialized.getRecordBufferFullWait());
Assert.assertEquals(base.getRecordBufferOfferTimeout(), deserialized.getRecordBufferOfferTimeout());
Assert.assertEquals(base.getRecordBufferSize(), deserialized.getRecordBufferSize());
Assert.assertEquals(base.getMaxRecordsPerPoll(), deserialized.getMaxRecordsPerPoll());
}
@Test
public void testSerdeWithModifiedTuningConfigRemovedField() throws IOException
{
KinesisIndexTaskTuningConfig base = new KinesisIndexTaskTuningConfig(
1,
3L,
2,
100L,
new Period("PT3S"),
new File("/tmp/xxx"),
4,
new IndexSpec(),
true,
true,
5L,
true,
false,
1000,
1000,
500,
null,
42,
null,
false,
500,
500,
6000,
new Period("P3D")
);
String serialized = mapper.writeValueAsString(new TestModifiedKinesisIndexTaskTuningConfig(base, "loool"));
KinesisIndexTaskTuningConfig deserialized =
mapper.readValue(serialized, KinesisIndexTaskTuningConfig.class);
Assert.assertEquals(base.getMaxRowsInMemory(), deserialized.getMaxRowsInMemory());
Assert.assertEquals(base.getMaxBytesInMemory(), deserialized.getMaxBytesInMemory());
Assert.assertEquals(base.getMaxRowsPerSegment(), deserialized.getMaxRowsPerSegment());
Assert.assertEquals(base.getMaxTotalRows(), deserialized.getMaxTotalRows());
Assert.assertEquals(base.getIntermediatePersistPeriod(), deserialized.getIntermediatePersistPeriod());
Assert.assertEquals(base.getBasePersistDirectory(), deserialized.getBasePersistDirectory());
Assert.assertEquals(base.getMaxPendingPersists(), deserialized.getMaxPendingPersists());
Assert.assertEquals(base.getIndexSpec(), deserialized.getIndexSpec());
Assert.assertEquals(base.getBuildV9Directly(), deserialized.getBuildV9Directly());
Assert.assertEquals(base.isReportParseExceptions(), deserialized.isReportParseExceptions());
Assert.assertEquals(base.getHandoffConditionTimeout(), deserialized.getHandoffConditionTimeout());
Assert.assertEquals(base.isResetOffsetAutomatically(), deserialized.isResetOffsetAutomatically());
Assert.assertEquals(base.getSegmentWriteOutMediumFactory(), deserialized.getSegmentWriteOutMediumFactory());
Assert.assertEquals(base.getIntermediateHandoffPeriod(), deserialized.getIntermediateHandoffPeriod());
Assert.assertEquals(base.isLogParseExceptions(), deserialized.isLogParseExceptions());
Assert.assertEquals(base.getMaxParseExceptions(), deserialized.getMaxParseExceptions());
Assert.assertEquals(base.getMaxSavedParseExceptions(), deserialized.getMaxSavedParseExceptions());
Assert.assertEquals(base.getRecordBufferFullWait(), deserialized.getRecordBufferFullWait());
Assert.assertEquals(base.getRecordBufferOfferTimeout(), deserialized.getRecordBufferOfferTimeout());
Assert.assertEquals(base.getRecordBufferSize(), deserialized.getRecordBufferSize());
Assert.assertEquals(base.getMaxRecordsPerPoll(), deserialized.getMaxRecordsPerPoll());
}
@Test
public void testResetOffsetAndSkipSequenceNotBothTrue() throws Exception
{

View File

@ -0,0 +1,132 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexing.kinesis.test;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import org.apache.druid.indexing.kinesis.KinesisIndexTaskTuningConfig;
import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory;
import org.joda.time.Period;
import javax.annotation.Nullable;
import java.io.File;
@JsonTypeName("KinesisTuningConfig")
public class TestModifiedKinesisIndexTaskTuningConfig extends KinesisIndexTaskTuningConfig
{
private final String extra;
@JsonCreator
public TestModifiedKinesisIndexTaskTuningConfig(
@JsonProperty("maxRowsInMemory") Integer maxRowsInMemory,
@JsonProperty("maxBytesInMemory") Long maxBytesInMemory,
@JsonProperty("maxRowsPerSegment") Integer maxRowsPerSegment,
@JsonProperty("maxTotalRows") Long maxTotalRows,
@JsonProperty("intermediatePersistPeriod") Period intermediatePersistPeriod,
@JsonProperty("basePersistDirectory") File basePersistDirectory,
@JsonProperty("maxPendingPersists") Integer maxPendingPersists,
@JsonProperty("indexSpec") IndexSpec indexSpec,
@JsonProperty("buildV9Directly") Boolean buildV9Directly,
@JsonProperty("reportParseExceptions") Boolean reportParseExceptions,
@JsonProperty("handoffConditionTimeout") Long handoffConditionTimeout,
@JsonProperty("resetOffsetAutomatically") Boolean resetOffsetAutomatically,
@JsonProperty("skipSequenceNumberAvailabilityCheck") Boolean skipSequenceNumberAvailabilityCheck,
@JsonProperty("recordBufferSize") Integer recordBufferSize,
@JsonProperty("recordBufferOfferTimeout") Integer recordBufferOfferTimeout,
@JsonProperty("recordBufferFullWait") Integer recordBufferFullWait,
@JsonProperty("fetchSequenceNumberTimeout") Integer fetchSequenceNumberTimeout,
@JsonProperty("fetchThreads") Integer fetchThreads,
@JsonProperty("segmentWriteOutMediumFactory") @Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory,
@JsonProperty("logParseExceptions") @Nullable Boolean logParseExceptions,
@JsonProperty("maxParseExceptions") @Nullable Integer maxParseExceptions,
@JsonProperty("maxSavedParseExceptions") @Nullable Integer maxSavedParseExceptions,
@JsonProperty("maxRecordsPerPoll") @Nullable Integer maxRecordsPerPoll,
@JsonProperty("intermediateHandoffPeriod") @Nullable Period intermediateHandoffPeriod,
@JsonProperty("extra") String extra
)
{
super(
maxRowsInMemory,
maxBytesInMemory,
maxRowsPerSegment,
maxTotalRows,
intermediatePersistPeriod,
basePersistDirectory,
maxPendingPersists,
indexSpec,
buildV9Directly,
reportParseExceptions,
handoffConditionTimeout,
resetOffsetAutomatically,
skipSequenceNumberAvailabilityCheck,
recordBufferSize,
recordBufferOfferTimeout,
recordBufferFullWait,
fetchSequenceNumberTimeout,
fetchThreads,
segmentWriteOutMediumFactory,
logParseExceptions,
maxParseExceptions,
maxSavedParseExceptions,
maxRecordsPerPoll,
intermediateHandoffPeriod
);
this.extra = extra;
}
public TestModifiedKinesisIndexTaskTuningConfig(KinesisIndexTaskTuningConfig base, String extra)
{
super(
base.getMaxRowsInMemory(),
base.getMaxBytesInMemory(),
base.getMaxRowsPerSegment(),
base.getMaxTotalRows(),
base.getIntermediatePersistPeriod(),
base.getBasePersistDirectory(),
base.getMaxPendingPersists(),
base.getIndexSpec(),
base.getBuildV9Directly(),
base.isReportParseExceptions(),
base.getHandoffConditionTimeout(),
base.isResetOffsetAutomatically(),
base.isSkipSequenceNumberAvailabilityCheck(),
base.getRecordBufferSize(),
base.getRecordBufferOfferTimeout(),
base.getRecordBufferFullWait(),
base.getFetchSequenceNumberTimeout(),
base.getFetchThreads(),
base.getSegmentWriteOutMediumFactory(),
base.isLogParseExceptions(),
base.getMaxParseExceptions(),
base.getMaxSavedParseExceptions(),
base.getMaxRecordsPerPoll(),
base.getIntermediateHandoffPeriod()
);
this.extra = extra;
}
@JsonProperty("extra")
public String getExtra()
{
return extra;
}
}

View File

@ -73,6 +73,7 @@ import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.metadata.EntryExistsException;
import org.apache.druid.segment.indexing.DataSchema;
import org.joda.time.DateTime;
import javax.annotation.Nullable;
@ -166,6 +167,31 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
Optional<DateTime> maximumMessageTime,
@Nullable Set<PartitionIdType> exclusiveStartSequenceNumberPartitions
)
{
this(
groupId,
startingSequences,
minimumMessageTime,
maximumMessageTime,
exclusiveStartSequenceNumberPartitions,
generateSequenceName(
startingSequences,
minimumMessageTime,
maximumMessageTime,
spec.getDataSchema(),
taskTuningConfig
)
);
}
TaskGroup(
int groupId,
ImmutableMap<PartitionIdType, SequenceOffsetType> startingSequences,
Optional<DateTime> minimumMessageTime,
Optional<DateTime> maximumMessageTime,
Set<PartitionIdType> exclusiveStartSequenceNumberPartitions,
String baseSequenceName
)
{
this.groupId = groupId;
this.startingSequences = startingSequences;
@ -175,7 +201,7 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
this.exclusiveStartSequenceNumberPartitions = exclusiveStartSequenceNumberPartitions != null
? exclusiveStartSequenceNumberPartitions
: Collections.emptySet();
this.baseSequenceName = generateSequenceName(startingSequences, minimumMessageTime, maximumMessageTime);
this.baseSequenceName = baseSequenceName;
}
int addNewCheckpoint(Map<PartitionIdType, SequenceOffsetType> checkpoint)
@ -1153,12 +1179,18 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
}
}
if (metadataUpdateSuccess) {
resetMetadata.getSeekableStreamSequenceNumbers().getPartitionSequenceNumberMap().keySet().forEach(partition -> {
final int groupId = getTaskGroupIdForPartition(partition);
killTaskGroupForPartitions(ImmutableSet.of(partition), "DataSourceMetadata is updated while reset");
activelyReadingTaskGroups.remove(groupId);
partitionGroups.get(groupId).replaceAll((partitionId, sequence) -> getNotSetMarker());
});
resetMetadata.getSeekableStreamSequenceNumbers()
.getPartitionSequenceNumberMap()
.keySet()
.forEach(partition -> {
final int groupId = getTaskGroupIdForPartition(partition);
killTaskGroupForPartitions(
ImmutableSet.of(partition),
"DataSourceMetadata is updated while reset"
);
activelyReadingTaskGroups.remove(groupId);
partitionGroups.get(groupId).replaceAll((partitionId, sequence) -> getNotSetMarker());
});
} else {
throw new ISE("Unable to reset metadata");
}
@ -1256,7 +1288,8 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
futureTaskIds.add(taskId);
futures.add(
Futures.transform(
taskClient.getStatusAsync(taskId), new Function<SeekableStreamIndexTaskRunner.Status, Boolean>()
taskClient.getStatusAsync(taskId),
new Function<SeekableStreamIndexTaskRunner.Status, Boolean>()
{
@Override
public Boolean apply(SeekableStreamIndexTaskRunner.Status status)
@ -1270,7 +1303,8 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
.keySet()
.forEach(
partition -> addDiscoveredTaskToPendingCompletionTaskGroups(
getTaskGroupIdForPartition(partition),
getTaskGroupIdForPartition(
partition),
taskId,
seekableStreamIndexTask.getIOConfig()
.getStartSequenceNumbers()
@ -1294,7 +1328,9 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
succeeded = true;
SequenceOffsetType previousOffset = partitionOffsets.putIfAbsent(partition, sequence);
if (previousOffset != null
&& (makeSequenceNumber(previousOffset).compareTo(makeSequenceNumber(sequence))) < 0) {
&& (makeSequenceNumber(previousOffset)
.compareTo(makeSequenceNumber(
sequence))) < 0) {
succeeded = partitionOffsets.replace(partition, previousOffset, sequence);
}
} while (!succeeded);
@ -1310,7 +1346,8 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
taskId
);
try {
stopTask(taskId, false).get(futureTimeoutInSeconds, TimeUnit.SECONDS);
stopTask(taskId, false)
.get(futureTimeoutInSeconds, TimeUnit.SECONDS);
}
catch (InterruptedException | ExecutionException | TimeoutException e) {
log.warn(e, "Exception while stopping task");
@ -1326,7 +1363,8 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
taskId
);
try {
stopTask(taskId, false).get(futureTimeoutInSeconds, TimeUnit.SECONDS);
stopTask(taskId, false)
.get(futureTimeoutInSeconds, TimeUnit.SECONDS);
}
catch (InterruptedException | ExecutionException | TimeoutException e) {
log.warn(e, "Exception while stopping task");
@ -1337,6 +1375,8 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
taskGroupId,
k -> {
log.info("Creating a new task group for taskGroupId[%d]", taskGroupId);
// We reassign the task's original base sequence name (from the existing task) to the
// task group so that the replica segment allocations are the same.
return new TaskGroup(
taskGroupId,
ImmutableMap.copyOf(
@ -1348,7 +1388,8 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
seekableStreamIndexTask.getIOConfig().getMaximumMessageTime(),
seekableStreamIndexTask.getIOConfig()
.getStartSequenceNumbers()
.getExclusivePartitions()
.getExclusivePartitions(),
seekableStreamIndexTask.getIOConfig().getBaseSequenceName()
);
}
);
@ -1361,6 +1402,7 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
taskId
);
}
verifySameSequenceNameForAllTasksInGroup(taskGroupId);
}
}
return true;
@ -1370,7 +1412,6 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
return null;
}
}
}, workerExec
)
);
@ -1378,7 +1419,6 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
}
}
List<Boolean> results = Futures.successfulAsList(futures).get(futureTimeoutInSeconds, TimeUnit.SECONDS);
for (int i = 0; i < results.size(); i++) {
if (results.get(i) == null) {
@ -1423,10 +1463,11 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
final List<String> taskIds = new ArrayList<>();
for (String taskId : taskGroup.taskIds()) {
final ListenableFuture<TreeMap<Integer, Map<PartitionIdType, SequenceOffsetType>>> checkpointsFuture = taskClient.getCheckpointsAsync(
taskId,
true
);
final ListenableFuture<TreeMap<Integer, Map<PartitionIdType, SequenceOffsetType>>> checkpointsFuture =
taskClient.getCheckpointsAsync(
taskId,
true
);
futures.add(checkpointsFuture);
taskIds.add(taskId);
}
@ -1611,6 +1652,34 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
taskGroupList.add(newTaskGroup);
}
// Sanity check to ensure that tasks have the same sequence name as their task group
private void verifySameSequenceNameForAllTasksInGroup(int groupId)
{
String taskGroupSequenceName = activelyReadingTaskGroups.get(groupId).baseSequenceName;
boolean allSequenceNamesMatch =
activelyReadingTaskGroups.get(groupId)
.tasks
.keySet()
.stream()
.map(x -> {
Optional<Task> taskOptional = taskStorage.getTask(x);
if (!taskOptional.isPresent() || !doesTaskTypeMatchSupervisor(taskOptional.get())) {
return false;
}
@SuppressWarnings("unchecked")
SeekableStreamIndexTask<PartitionIdType, SequenceOffsetType> task =
(SeekableStreamIndexTask<PartitionIdType, SequenceOffsetType>) taskOptional.get();
return task.getIOConfig().getBaseSequenceName();
})
.allMatch(taskSeqName -> taskSeqName.equals(taskGroupSequenceName));
if (!allSequenceNamesMatch) {
throw new ISE(
"Base sequence names do not match for the tasks in the task group with ID [%s]",
groupId
);
}
}
private ListenableFuture<Void> stopTask(final String id, final boolean publish)
{
return Futures.transform(
@ -1630,7 +1699,8 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
);
}
private boolean isTaskCurrent(int taskGroupId, String taskId)
@VisibleForTesting
public boolean isTaskCurrent(int taskGroupId, String taskId)
{
Optional<Task> taskOptional = taskStorage.getTask(taskId);
if (!taskOptional.isPresent() || !doesTaskTypeMatchSupervisor(taskOptional.get())) {
@ -1638,22 +1708,39 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
}
@SuppressWarnings("unchecked")
SeekableStreamIndexTask<PartitionIdType, SequenceOffsetType> task = (SeekableStreamIndexTask<PartitionIdType, SequenceOffsetType>) taskOptional
.get();
SeekableStreamIndexTask<PartitionIdType, SequenceOffsetType> task =
(SeekableStreamIndexTask<PartitionIdType, SequenceOffsetType>) taskOptional.get();
// We recompute the sequence name hash for the supervisor's own configuration and compare this to the hash created
// by rehashing the task's sequence name using the most up-to-date class definitions of tuning config and
// data schema. Recomputing both ensures that forwards-compatible tasks won't be killed (which would occur
// if the hash generated using the old class definitions was used).
String taskSequenceName = generateSequenceName(
task.getIOConfig().getStartSequenceNumbers().getPartitionSequenceNumberMap(),
task.getIOConfig().getMinimumMessageTime(),
task.getIOConfig().getMaximumMessageTime(),
task.getDataSchema(),
task.getTuningConfig()
);
String taskSequenceName = task.getIOConfig().getBaseSequenceName();
if (activelyReadingTaskGroups.get(taskGroupId) != null) {
return Preconditions
.checkNotNull(activelyReadingTaskGroups.get(taskGroupId), "null taskGroup for taskId[%s]", taskGroupId)
.baseSequenceName
.equals(taskSequenceName);
TaskGroup taskGroup = activelyReadingTaskGroups.get(taskGroupId);
return generateSequenceName(
taskGroup.startingSequences,
taskGroup.minimumMessageTime,
taskGroup.maximumMessageTime,
spec.getDataSchema(),
taskTuningConfig
).equals(taskSequenceName);
} else {
return generateSequenceName(
task.getIOConfig()
.getStartSequenceNumbers()
.getPartitionSequenceNumberMap(),
task.getIOConfig().getMinimumMessageTime(),
task.getIOConfig().getMaximumMessageTime()
task.getIOConfig().getMaximumMessageTime(),
spec.getDataSchema(),
taskTuningConfig
).equals(taskSequenceName);
}
}
@ -1662,7 +1749,9 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
protected String generateSequenceName(
Map<PartitionIdType, SequenceOffsetType> startPartitions,
Optional<DateTime> minimumMessageTime,
Optional<DateTime> maximumMessageTime
Optional<DateTime> maximumMessageTime,
DataSchema dataSchema,
SeekableStreamIndexTaskTuningConfig tuningConfig
)
{
StringBuilder sb = new StringBuilder();
@ -1675,17 +1764,17 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
String minMsgTimeStr = (minimumMessageTime.isPresent() ? String.valueOf(minimumMessageTime.get().getMillis()) : "");
String maxMsgTimeStr = (maximumMessageTime.isPresent() ? String.valueOf(maximumMessageTime.get().getMillis()) : "");
String dataSchema, tuningConfig;
String dataSchemaStr, tuningConfigStr;
try {
dataSchema = sortingMapper.writeValueAsString(spec.getDataSchema());
tuningConfig = sortingMapper.writeValueAsString(taskTuningConfig);
dataSchemaStr = sortingMapper.writeValueAsString(dataSchema);
tuningConfigStr = sortingMapper.writeValueAsString(tuningConfig);
}
catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
String hashCode = DigestUtils.sha1Hex(dataSchema
+ tuningConfig
String hashCode = DigestUtils.sha1Hex(dataSchemaStr
+ tuningConfigStr
+ partitionOffsetStr
+ minMsgTimeStr
+ maxMsgTimeStr)
@ -2282,7 +2371,6 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
exclusiveStartSequenceNumberPartitions
)
);
}
}
@ -2474,8 +2562,8 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
.get(groupId)
.exclusiveStartSequenceNumberPartitions;
DateTime minimumMessageTime = activelyReadingTaskGroups.get(groupId).minimumMessageTime.orNull();
DateTime maximumMessageTime = activelyReadingTaskGroups.get(groupId).maximumMessageTime.orNull();
DateTime minimumMessageTime = group.minimumMessageTime.orNull();
DateTime maximumMessageTime = group.maximumMessageTime.orNull();
SeekableStreamIndexTaskIOConfig newIoConfig = createTaskIoConfig(
groupId,
@ -2488,7 +2576,6 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
ioConfig
);
List<SeekableStreamIndexTask<PartitionIdType, SequenceOffsetType>> taskList = createIndexTasks(
replicas,
group.baseSequenceName,
@ -2711,7 +2798,7 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
* calculates the taskgroup id that the given partition belongs to.
* different between Kafka/Kinesis since Kinesis uses String as partition id
*
* @param partition paritition id
* @param partition partition id
*
* @return taskgroup id
*/

View File

@ -48,6 +48,7 @@ import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
@ -375,4 +376,82 @@ public class DataSchemaTest
)
);
}
@Test
public void testSerdeWithUpdatedDataSchemaAddedField() throws IOException
{
Map<String, Object> parser = jsonMapper.convertValue(
new StringInputRowParser(
new JSONParseSpec(
new TimestampSpec("time", "auto", null),
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("dimB", "dimA")), null, null),
null,
null
),
null
), JacksonUtils.TYPE_REFERENCE_MAP_STRING_OBJECT
);
DataSchema originalSchema = new DataSchema(
"test",
parser,
new AggregatorFactory[]{
new DoubleSumAggregatorFactory("metric1", "col1"),
new DoubleSumAggregatorFactory("metric2", "col2"),
},
new ArbitraryGranularitySpec(Granularities.DAY, ImmutableList.of(Intervals.of("2014/2015"))),
null,
jsonMapper
);
String serialized = jsonMapper.writeValueAsString(originalSchema);
TestModifiedDataSchema deserialized = jsonMapper.readValue(serialized, TestModifiedDataSchema.class);
Assert.assertEquals(null, deserialized.getExtra());
Assert.assertEquals(originalSchema.getDataSource(), deserialized.getDataSource());
Assert.assertEquals(originalSchema.getGranularitySpec(), deserialized.getGranularitySpec());
Assert.assertEquals(originalSchema.getParser().getParseSpec(), deserialized.getParser().getParseSpec());
Assert.assertArrayEquals(originalSchema.getAggregators(), deserialized.getAggregators());
Assert.assertEquals(originalSchema.getTransformSpec(), deserialized.getTransformSpec());
Assert.assertEquals(originalSchema.getParserMap(), deserialized.getParserMap());
}
@Test
public void testSerdeWithUpdatedDataSchemaRemovedField() throws IOException
{
Map<String, Object> parser = jsonMapper.convertValue(
new StringInputRowParser(
new JSONParseSpec(
new TimestampSpec("time", "auto", null),
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("dimB", "dimA")), null, null),
null,
null
),
null
), JacksonUtils.TYPE_REFERENCE_MAP_STRING_OBJECT
);
TestModifiedDataSchema originalSchema = new TestModifiedDataSchema(
"test",
parser,
new AggregatorFactory[]{
new DoubleSumAggregatorFactory("metric1", "col1"),
new DoubleSumAggregatorFactory("metric2", "col2"),
},
new ArbitraryGranularitySpec(Granularities.DAY, ImmutableList.of(Intervals.of("2014/2015"))),
null,
jsonMapper,
"some arbitrary string"
);
String serialized = jsonMapper.writeValueAsString(originalSchema);
DataSchema deserialized = jsonMapper.readValue(serialized, DataSchema.class);
Assert.assertEquals(originalSchema.getDataSource(), deserialized.getDataSource());
Assert.assertEquals(originalSchema.getGranularitySpec(), deserialized.getGranularitySpec());
Assert.assertEquals(originalSchema.getParser().getParseSpec(), deserialized.getParser().getParseSpec());
Assert.assertArrayEquals(originalSchema.getAggregators(), deserialized.getAggregators());
Assert.assertEquals(originalSchema.getTransformSpec(), deserialized.getTransformSpec());
Assert.assertEquals(originalSchema.getParserMap(), deserialized.getParserMap());
}
}

View File

@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.segment.indexing;
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.segment.indexing.granularity.GranularitySpec;
import org.apache.druid.segment.transform.TransformSpec;
import java.util.Map;
public class TestModifiedDataSchema extends DataSchema
{
private final String extra;
@JsonCreator
public TestModifiedDataSchema(
@JsonProperty("dataSource") String dataSource,
@JsonProperty("parser") Map<String, Object> parser,
@JsonProperty("metricsSpec") AggregatorFactory[] aggregators,
@JsonProperty("granularitySpec") GranularitySpec granularitySpec,
@JsonProperty("transformSpec") TransformSpec transformSpec,
@JacksonInject ObjectMapper jsonMapper,
@JsonProperty("extra") String extra
)
{
super(dataSource, parser, aggregators, granularitySpec, transformSpec, jsonMapper);
this.extra = extra;
}
@JsonProperty("extra")
public String getExtra()
{
return extra;
}
}