Support kinesis compatibility (#7351)

This commit is contained in:
Jihoon Son 2019-04-08 19:19:34 -07:00 committed by Jonathan Wei
parent 7cd5477658
commit e87d6e32b3
5 changed files with 386 additions and 39 deletions

View File

@ -41,8 +41,10 @@ public class KafkaIndexTaskIOConfig extends SeekableStreamIndexTaskIOConfig<Inte
@JsonProperty("taskGroupId") @Nullable Integer taskGroupId, // can be null for backward compabitility @JsonProperty("taskGroupId") @Nullable Integer taskGroupId, // can be null for backward compabitility
@JsonProperty("baseSequenceName") String baseSequenceName, @JsonProperty("baseSequenceName") String baseSequenceName,
// startPartitions and endPartitions exist to be able to read old ioConfigs in metadata store // startPartitions and endPartitions exist to be able to read old ioConfigs in metadata store
@JsonProperty("startPartitions") @Nullable SeekableStreamEndSequenceNumbers<Integer, Long> startPartitions, @JsonProperty("startPartitions") @Nullable
@JsonProperty("endPartitions") @Nullable SeekableStreamEndSequenceNumbers<Integer, Long> endPartitions, @Deprecated SeekableStreamEndSequenceNumbers<Integer, Long> startPartitions,
@JsonProperty("endPartitions") @Nullable
@Deprecated SeekableStreamEndSequenceNumbers<Integer, Long> endPartitions,
// startSequenceNumbers and endSequenceNumbers must be set for new versions // startSequenceNumbers and endSequenceNumbers must be set for new versions
@JsonProperty("startSequenceNumbers") @JsonProperty("startSequenceNumbers")
@Nullable SeekableStreamStartSequenceNumbers<Integer, Long> startSequenceNumbers, @Nullable SeekableStreamStartSequenceNumbers<Integer, Long> startSequenceNumbers,
@ -115,6 +117,7 @@ public class KafkaIndexTaskIOConfig extends SeekableStreamIndexTaskIOConfig<Inte
* {@link SeekableStreamStartSequenceNumbers} didn't exist before. * {@link SeekableStreamStartSequenceNumbers} didn't exist before.
*/ */
@JsonProperty @JsonProperty
@Deprecated
public SeekableStreamEndSequenceNumbers<Integer, Long> getStartPartitions() public SeekableStreamEndSequenceNumbers<Integer, Long> getStartPartitions()
{ {
// Converting to start sequence numbers. This is allowed for Kafka because the start offset is always inclusive. // Converting to start sequence numbers. This is allowed for Kafka because the start offset is always inclusive.
@ -130,6 +133,7 @@ public class KafkaIndexTaskIOConfig extends SeekableStreamIndexTaskIOConfig<Inte
* old version of Druid. * old version of Druid.
*/ */
@JsonProperty @JsonProperty
@Deprecated
public SeekableStreamEndSequenceNumbers<Integer, Long> getEndPartitions() public SeekableStreamEndSequenceNumbers<Integer, Long> getEndPartitions()
{ {
return getEndSequenceNumbers(); return getEndSequenceNumbers();

View File

@ -28,6 +28,7 @@ import org.apache.druid.indexing.seekablestream.SeekableStreamStartSequenceNumbe
import org.joda.time.DateTime; import org.joda.time.DateTime;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import java.util.Set;
public class KinesisIndexTaskIOConfig extends SeekableStreamIndexTaskIOConfig<String, String> public class KinesisIndexTaskIOConfig extends SeekableStreamIndexTaskIOConfig<String, String>
{ {
@ -46,6 +47,17 @@ public class KinesisIndexTaskIOConfig extends SeekableStreamIndexTaskIOConfig<St
public KinesisIndexTaskIOConfig( public KinesisIndexTaskIOConfig(
@JsonProperty("taskGroupId") @Nullable Integer taskGroupId, @JsonProperty("taskGroupId") @Nullable Integer taskGroupId,
@JsonProperty("baseSequenceName") String baseSequenceName, @JsonProperty("baseSequenceName") String baseSequenceName,
// below three deprecated variables exist to be able to read old ioConfigs in metadata store
@JsonProperty("startPartitions")
@Nullable
@Deprecated SeekableStreamEndSequenceNumbers<String, String> startPartitions,
@JsonProperty("endPartitions")
@Nullable
@Deprecated SeekableStreamEndSequenceNumbers<String, String> endPartitions,
@JsonProperty("exclusiveStartSequenceNumberPartitions")
@Nullable
@Deprecated Set<String> exclusiveStartSequenceNumberPartitions,
// startSequenceNumbers and endSequenceNumbers must be set for new versions
@JsonProperty("startSequenceNumbers") SeekableStreamStartSequenceNumbers<String, String> startSequenceNumbers, @JsonProperty("startSequenceNumbers") SeekableStreamStartSequenceNumbers<String, String> startSequenceNumbers,
@JsonProperty("endSequenceNumbers") SeekableStreamEndSequenceNumbers<String, String> endSequenceNumbers, @JsonProperty("endSequenceNumbers") SeekableStreamEndSequenceNumbers<String, String> endSequenceNumbers,
@JsonProperty("useTransaction") Boolean useTransaction, @JsonProperty("useTransaction") Boolean useTransaction,
@ -62,17 +74,17 @@ public class KinesisIndexTaskIOConfig extends SeekableStreamIndexTaskIOConfig<St
super( super(
taskGroupId, taskGroupId,
baseSequenceName, baseSequenceName,
startSequenceNumbers, getStartSequenceNumbers(startSequenceNumbers, startPartitions, exclusiveStartSequenceNumberPartitions),
endSequenceNumbers, endSequenceNumbers == null ? endPartitions : endSequenceNumbers,
useTransaction, useTransaction,
minimumMessageTime, minimumMessageTime,
maximumMessageTime maximumMessageTime
); );
Preconditions.checkArgument( Preconditions.checkArgument(
endSequenceNumbers.getPartitionSequenceNumberMap() getEndSequenceNumbers().getPartitionSequenceNumberMap()
.values() .values()
.stream() .stream()
.noneMatch(x -> x.equals(KinesisSequenceNumber.END_OF_SHARD_MARKER)), .noneMatch(x -> x.equals(KinesisSequenceNumber.END_OF_SHARD_MARKER)),
"End sequenceNumbers must not have the end of shard marker (EOS)" "End sequenceNumbers must not have the end of shard marker (EOS)"
); );
@ -84,6 +96,99 @@ public class KinesisIndexTaskIOConfig extends SeekableStreamIndexTaskIOConfig<St
this.deaggregate = deaggregate; this.deaggregate = deaggregate;
} }
public KinesisIndexTaskIOConfig(
int taskGroupId,
String baseSequenceName,
SeekableStreamStartSequenceNumbers<String, String> startSequenceNumbers,
SeekableStreamEndSequenceNumbers<String, String> endSequenceNumbers,
Boolean useTransaction,
DateTime minimumMessageTime,
DateTime maximumMessageTime,
String endpoint,
Integer recordsPerFetch,
Integer fetchDelayMillis,
String awsAssumedRoleArn,
String awsExternalId,
boolean deaggregate
)
{
this(
taskGroupId,
baseSequenceName,
null,
null,
null,
startSequenceNumbers,
endSequenceNumbers,
useTransaction,
minimumMessageTime,
maximumMessageTime,
endpoint,
recordsPerFetch,
fetchDelayMillis,
awsAssumedRoleArn,
awsExternalId,
deaggregate
);
}
private static SeekableStreamStartSequenceNumbers<String, String> getStartSequenceNumbers(
@Nullable SeekableStreamStartSequenceNumbers<String, String> newStartSequenceNumbers,
@Nullable SeekableStreamEndSequenceNumbers<String, String> oldStartSequenceNumbers,
@Nullable Set<String> exclusiveStartSequenceNumberPartitions
)
{
if (newStartSequenceNumbers == null) {
Preconditions.checkNotNull(
oldStartSequenceNumbers,
"Either startSequenceNumbers or startPartitions shoulnd't be null"
);
return new SeekableStreamStartSequenceNumbers<>(
oldStartSequenceNumbers.getStream(),
oldStartSequenceNumbers.getPartitionSequenceNumberMap(),
exclusiveStartSequenceNumberPartitions
);
} else {
return newStartSequenceNumbers;
}
}
/**
* This method is for compatibilty so that newer version of KafkaIndexTaskIOConfig can be read by
* old version of Druid. Note that this method returns end sequence numbers instead of start. This is because
* {@link SeekableStreamStartSequenceNumbers} didn't exist before.
*/
@JsonProperty
@Deprecated
public SeekableStreamEndSequenceNumbers<String, String> getStartPartitions()
{
// Converting to start sequence numbers. This is allowed for Kafka because the start offset is always inclusive.
final SeekableStreamStartSequenceNumbers<String, String> startSequenceNumbers = getStartSequenceNumbers();
return new SeekableStreamEndSequenceNumbers<>(
startSequenceNumbers.getStream(),
startSequenceNumbers.getPartitionSequenceNumberMap()
);
}
/**
* This method is for compatibilty so that newer version of KafkaIndexTaskIOConfig can be read by
* old version of Druid.
*/
@JsonProperty
@Deprecated
public SeekableStreamEndSequenceNumbers<String, String> getEndPartitions()
{
return getEndSequenceNumbers();
}
@JsonProperty
@Deprecated
public Set<String> getExclusiveStartSequenceNumberPartitions()
{
return getStartSequenceNumbers().getExclusivePartitions();
}
@JsonProperty @JsonProperty
public String getEndpoint() public String getEndpoint()
{ {

View File

@ -19,21 +19,31 @@
package org.apache.druid.indexing.kinesis; package org.apache.druid.indexing.kinesis;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.JsonMappingException; import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.Module; import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet; import com.google.common.collect.ImmutableSet;
import org.apache.druid.indexing.seekablestream.SeekableStreamEndSequenceNumbers;
import org.apache.druid.indexing.seekablestream.SeekableStreamStartSequenceNumbers;
import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.segment.indexing.IOConfig; import org.apache.druid.segment.indexing.IOConfig;
import org.hamcrest.CoreMatchers; import org.hamcrest.CoreMatchers;
import org.joda.time.DateTime;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Rule; import org.junit.Rule;
import org.junit.Test; import org.junit.Test;
import org.junit.rules.ExpectedException; import org.junit.rules.ExpectedException;
import javax.annotation.Nullable;
import java.io.IOException;
import java.util.Collections; import java.util.Collections;
import java.util.Set;
public class KinesisIOConfigTest public class KinesisIOConfigTest
{ {
@ -243,4 +253,232 @@ public class KinesisIOConfigTest
exception.expectMessage(CoreMatchers.containsString("endpoint")); exception.expectMessage(CoreMatchers.containsString("endpoint"));
mapper.readValue(jsonStr, IOConfig.class); mapper.readValue(jsonStr, IOConfig.class);
} }
@Test
public void testDeserializeToOldIoConfig() throws IOException
{
final KinesisIndexTaskIOConfig currentConfig = new KinesisIndexTaskIOConfig(
0,
"baseSequenceName",
new SeekableStreamStartSequenceNumbers<>(
"stream",
ImmutableMap.of("1", "10L", "2", "5L"),
ImmutableSet.of("1")
),
new SeekableStreamEndSequenceNumbers<>("stream", ImmutableMap.of("1", "20L", "2", "30L")),
true,
DateTimes.nowUtc(),
DateTimes.nowUtc(),
"endpoint",
1000,
2000,
"awsAssumedRoleArn",
"awsExternalId",
true
);
final byte[] json = mapper.writeValueAsBytes(currentConfig);
final ObjectMapper oldMapper = new DefaultObjectMapper();
oldMapper.registerSubtypes(new NamedType(OldKinesisIndexTaskIoConfig.class, "kinesis"));
final OldKinesisIndexTaskIoConfig oldConfig = (OldKinesisIndexTaskIoConfig) oldMapper.readValue(
json,
IOConfig.class
);
Assert.assertEquals(currentConfig.getBaseSequenceName(), oldConfig.getBaseSequenceName());
Assert.assertEquals(
currentConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap(),
oldConfig.getStartPartitions().getPartitionSequenceNumberMap()
);
Assert.assertEquals(
currentConfig.getStartSequenceNumbers().getExclusivePartitions(),
oldConfig.getExclusiveStartSequenceNumberPartitions()
);
Assert.assertEquals(currentConfig.getEndSequenceNumbers(), oldConfig.getEndPartitions());
Assert.assertEquals(currentConfig.isUseTransaction(), oldConfig.isUseTransaction());
Assert.assertEquals(currentConfig.getMinimumMessageTime(), oldConfig.getMinimumMessageTime());
Assert.assertEquals(currentConfig.getMaximumMessageTime(), oldConfig.getMaximumMessageTime());
Assert.assertEquals(currentConfig.getEndpoint(), oldConfig.getEndpoint());
Assert.assertEquals(currentConfig.getRecordsPerFetch(), oldConfig.getRecordsPerFetch());
Assert.assertEquals(currentConfig.getFetchDelayMillis(), oldConfig.getFetchDelayMillis());
Assert.assertEquals(currentConfig.getAwsAssumedRoleArn(), oldConfig.getAwsAssumedRoleArn());
Assert.assertEquals(currentConfig.getAwsExternalId(), oldConfig.getAwsExternalId());
Assert.assertEquals(currentConfig.isDeaggregate(), oldConfig.isDeaggregate());
}
@Test
public void testDeserializeFromOldIoConfig() throws IOException
{
final ObjectMapper oldMapper = new DefaultObjectMapper();
oldMapper.registerSubtypes(new NamedType(OldKinesisIndexTaskIoConfig.class, "kinesis"));
final OldKinesisIndexTaskIoConfig oldConfig = new OldKinesisIndexTaskIoConfig(
"baseSequenceName",
new SeekableStreamEndSequenceNumbers<>("stream", ImmutableMap.of("1", "10L", "2", "5L")),
new SeekableStreamEndSequenceNumbers<>("stream", ImmutableMap.of("1", "20L", "2", "30L")),
ImmutableSet.of("1"),
true,
DateTimes.nowUtc(),
DateTimes.nowUtc(),
"endpoint",
1000,
2000,
"awsAssumedRoleArn",
"awsExternalId",
true
);
final byte[] json = oldMapper.writeValueAsBytes(oldConfig);
final KinesisIndexTaskIOConfig currentConfig = (KinesisIndexTaskIOConfig) mapper.readValue(json, IOConfig.class);
Assert.assertNull(currentConfig.getTaskGroupId());
Assert.assertEquals(oldConfig.getBaseSequenceName(), currentConfig.getBaseSequenceName());
Assert.assertEquals(
oldConfig.getStartPartitions().getPartitionSequenceNumberMap(),
currentConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap()
);
Assert.assertEquals(
oldConfig.getExclusiveStartSequenceNumberPartitions(),
currentConfig.getStartSequenceNumbers().getExclusivePartitions()
);
Assert.assertEquals(oldConfig.getEndPartitions(), currentConfig.getEndSequenceNumbers());
Assert.assertEquals(oldConfig.isUseTransaction(), currentConfig.isUseTransaction());
Assert.assertEquals(oldConfig.getMinimumMessageTime(), currentConfig.getMinimumMessageTime());
Assert.assertEquals(oldConfig.getMaximumMessageTime(), currentConfig.getMaximumMessageTime());
Assert.assertEquals(oldConfig.getEndpoint(), currentConfig.getEndpoint());
Assert.assertEquals(oldConfig.getRecordsPerFetch(), currentConfig.getRecordsPerFetch());
Assert.assertEquals(oldConfig.getFetchDelayMillis(), currentConfig.getFetchDelayMillis());
Assert.assertEquals(oldConfig.getAwsAssumedRoleArn(), currentConfig.getAwsAssumedRoleArn());
Assert.assertEquals(oldConfig.getAwsExternalId(), currentConfig.getAwsExternalId());
Assert.assertEquals(oldConfig.isDeaggregate(), currentConfig.isDeaggregate());
}
private static class OldKinesisIndexTaskIoConfig implements IOConfig
{
private final String baseSequenceName;
private final SeekableStreamEndSequenceNumbers<String, String> startPartitions;
private final SeekableStreamEndSequenceNumbers<String, String> endPartitions;
private final Set<String> exclusiveStartSequenceNumberPartitions;
private final boolean useTransaction;
private final Optional<DateTime> minimumMessageTime;
private final Optional<DateTime> maximumMessageTime;
private final String endpoint;
private final Integer recordsPerFetch;
private final Integer fetchDelayMillis;
private final String awsAssumedRoleArn;
private final String awsExternalId;
private final boolean deaggregate;
@JsonCreator
private OldKinesisIndexTaskIoConfig(
@JsonProperty("baseSequenceName") String baseSequenceName,
@JsonProperty("startPartitions") @Nullable SeekableStreamEndSequenceNumbers<String, String> startPartitions,
@JsonProperty("endPartitions") @Nullable SeekableStreamEndSequenceNumbers<String, String> endPartitions,
@JsonProperty("exclusiveStartSequenceNumberPartitions") Set<String> exclusiveStartSequenceNumberPartitions,
@JsonProperty("useTransaction") Boolean useTransaction,
@JsonProperty("minimumMessageTime") DateTime minimumMessageTime,
@JsonProperty("maximumMessageTime") DateTime maximumMessageTime,
@JsonProperty("endpoint") String endpoint,
@JsonProperty("recordsPerFetch") Integer recordsPerFetch,
@JsonProperty("fetchDelayMillis") Integer fetchDelayMillis,
@JsonProperty("awsAssumedRoleArn") String awsAssumedRoleArn,
@JsonProperty("awsExternalId") String awsExternalId,
@JsonProperty("deaggregate") boolean deaggregate
)
{
this.baseSequenceName = baseSequenceName;
this.startPartitions = startPartitions;
this.endPartitions = endPartitions;
this.exclusiveStartSequenceNumberPartitions = exclusiveStartSequenceNumberPartitions;
this.useTransaction = useTransaction;
this.minimumMessageTime = Optional.fromNullable(minimumMessageTime);
this.maximumMessageTime = Optional.fromNullable(maximumMessageTime);
this.endpoint = endpoint;
this.recordsPerFetch = recordsPerFetch;
this.fetchDelayMillis = fetchDelayMillis;
this.awsAssumedRoleArn = awsAssumedRoleArn;
this.awsExternalId = awsExternalId;
this.deaggregate = deaggregate;
}
@JsonProperty
public String getBaseSequenceName()
{
return baseSequenceName;
}
@JsonProperty
public SeekableStreamEndSequenceNumbers<String, String> getStartPartitions()
{
return startPartitions;
}
@JsonProperty
public SeekableStreamEndSequenceNumbers<String, String> getEndPartitions()
{
return endPartitions;
}
@JsonProperty
public Set<String> getExclusiveStartSequenceNumberPartitions()
{
return exclusiveStartSequenceNumberPartitions;
}
@JsonProperty
public boolean isUseTransaction()
{
return useTransaction;
}
@JsonProperty
public Optional<DateTime> getMinimumMessageTime()
{
return minimumMessageTime;
}
@JsonProperty
public Optional<DateTime> getMaximumMessageTime()
{
return maximumMessageTime;
}
@JsonProperty
public String getEndpoint()
{
return endpoint;
}
@JsonProperty
public int getRecordsPerFetch()
{
return recordsPerFetch;
}
@JsonProperty
public int getFetchDelayMillis()
{
return fetchDelayMillis;
}
@JsonProperty
public String getAwsAssumedRoleArn()
{
return awsAssumedRoleArn;
}
@JsonProperty
public String getAwsExternalId()
{
return awsExternalId;
}
@JsonProperty
public boolean isDeaggregate()
{
return deaggregate;
}
}
} }

View File

@ -399,7 +399,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport
final KinesisIndexTask task = createTask( final KinesisIndexTask task = createTask(
null, null,
new KinesisIndexTaskIOConfig( new KinesisIndexTaskIOConfig(
null, 0,
"sequence0", "sequence0",
new SeekableStreamStartSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "2"), ImmutableSet.of()), new SeekableStreamStartSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "2"), ImmutableSet.of()),
new SeekableStreamEndSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "4")), new SeekableStreamEndSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "4")),
@ -469,7 +469,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport
final KinesisIndexTask task = createTask( final KinesisIndexTask task = createTask(
null, null,
new KinesisIndexTaskIOConfig( new KinesisIndexTaskIOConfig(
null, 0,
"sequence0", "sequence0",
new SeekableStreamStartSequenceNumbers<>(stream, ImmutableMap.of(shardId0, "0"), ImmutableSet.of()), new SeekableStreamStartSequenceNumbers<>(stream, ImmutableMap.of(shardId0, "0"), ImmutableSet.of()),
new SeekableStreamEndSequenceNumbers<>(stream, ImmutableMap.of(shardId0, "1")), new SeekableStreamEndSequenceNumbers<>(stream, ImmutableMap.of(shardId0, "1")),
@ -557,7 +557,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport
final KinesisIndexTask task = createTask( final KinesisIndexTask task = createTask(
null, null,
new KinesisIndexTaskIOConfig( new KinesisIndexTaskIOConfig(
null, 0,
baseSequenceName, baseSequenceName,
startPartitions, startPartitions,
endPartitions, endPartitions,
@ -683,7 +683,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport
final KinesisIndexTask task = createTask( final KinesisIndexTask task = createTask(
null, null,
new KinesisIndexTaskIOConfig( new KinesisIndexTaskIOConfig(
null, 0,
baseSequenceName, baseSequenceName,
startPartitions, startPartitions,
endPartitions, endPartitions,
@ -795,7 +795,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport
final KinesisIndexTask task = createTask( final KinesisIndexTask task = createTask(
null, null,
new KinesisIndexTaskIOConfig( new KinesisIndexTaskIOConfig(
null, 0,
"sequence0", "sequence0",
new SeekableStreamStartSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "0"), ImmutableSet.of()), new SeekableStreamStartSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "0"), ImmutableSet.of()),
new SeekableStreamEndSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "4")), new SeekableStreamEndSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "4")),
@ -864,7 +864,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport
final KinesisIndexTask task = createTask( final KinesisIndexTask task = createTask(
null, null,
new KinesisIndexTaskIOConfig( new KinesisIndexTaskIOConfig(
null, 0,
"sequence0", "sequence0",
new SeekableStreamStartSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "0"), ImmutableSet.of()), new SeekableStreamStartSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "0"), ImmutableSet.of()),
new SeekableStreamEndSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "4")), new SeekableStreamEndSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "4")),
@ -944,7 +944,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport
) )
), ),
new KinesisIndexTaskIOConfig( new KinesisIndexTaskIOConfig(
null, 0,
"sequence0", "sequence0",
new SeekableStreamStartSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "0"), ImmutableSet.of()), new SeekableStreamStartSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "0"), ImmutableSet.of()),
new SeekableStreamEndSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "4")), new SeekableStreamEndSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "4")),
@ -1015,7 +1015,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport
final KinesisIndexTask task = createTask( final KinesisIndexTask task = createTask(
null, null,
new KinesisIndexTaskIOConfig( new KinesisIndexTaskIOConfig(
null, 0,
"sequence0", "sequence0",
new SeekableStreamStartSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "2"), ImmutableSet.of()), new SeekableStreamStartSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "2"), ImmutableSet.of()),
new SeekableStreamEndSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "2")), new SeekableStreamEndSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "2")),
@ -1071,7 +1071,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport
final KinesisIndexTask task = createTask( final KinesisIndexTask task = createTask(
null, null,
new KinesisIndexTaskIOConfig( new KinesisIndexTaskIOConfig(
null, 0,
"sequence0", "sequence0",
new SeekableStreamStartSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "2"), ImmutableSet.of()), new SeekableStreamStartSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "2"), ImmutableSet.of()),
new SeekableStreamEndSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "4")), new SeekableStreamEndSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "4")),
@ -1140,7 +1140,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport
final KinesisIndexTask task = createTask( final KinesisIndexTask task = createTask(
null, null,
new KinesisIndexTaskIOConfig( new KinesisIndexTaskIOConfig(
null, 0,
"sequence0", "sequence0",
new SeekableStreamStartSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "2"), ImmutableSet.of()), new SeekableStreamStartSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "2"), ImmutableSet.of()),
new SeekableStreamEndSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "4")), new SeekableStreamEndSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "4")),
@ -1209,7 +1209,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport
final KinesisIndexTask task = createTask( final KinesisIndexTask task = createTask(
null, null,
new KinesisIndexTaskIOConfig( new KinesisIndexTaskIOConfig(
null, 0,
"sequence0", "sequence0",
new SeekableStreamStartSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "2"), ImmutableSet.of()), new SeekableStreamStartSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "2"), ImmutableSet.of()),
new SeekableStreamEndSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "5")), new SeekableStreamEndSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "5")),
@ -1268,7 +1268,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport
final KinesisIndexTask task = createTask( final KinesisIndexTask task = createTask(
null, null,
new KinesisIndexTaskIOConfig( new KinesisIndexTaskIOConfig(
null, 0,
"sequence0", "sequence0",
new SeekableStreamStartSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "2"), ImmutableSet.of()), new SeekableStreamStartSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "2"), ImmutableSet.of()),
new SeekableStreamEndSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "12")), new SeekableStreamEndSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "12")),
@ -1366,7 +1366,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport
final KinesisIndexTask task = createTask( final KinesisIndexTask task = createTask(
null, null,
new KinesisIndexTaskIOConfig( new KinesisIndexTaskIOConfig(
null, 0,
"sequence0", "sequence0",
new SeekableStreamStartSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "2"), ImmutableSet.of()), new SeekableStreamStartSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "2"), ImmutableSet.of()),
new SeekableStreamEndSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "9")), new SeekableStreamEndSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "9")),
@ -1448,7 +1448,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport
final KinesisIndexTask task1 = createTask( final KinesisIndexTask task1 = createTask(
null, null,
new KinesisIndexTaskIOConfig( new KinesisIndexTaskIOConfig(
null, 0,
"sequence0", "sequence0",
new SeekableStreamStartSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "2"), ImmutableSet.of()), new SeekableStreamStartSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "2"), ImmutableSet.of()),
new SeekableStreamEndSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "4")), new SeekableStreamEndSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "4")),
@ -1466,7 +1466,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport
final KinesisIndexTask task2 = createTask( final KinesisIndexTask task2 = createTask(
null, null,
new KinesisIndexTaskIOConfig( new KinesisIndexTaskIOConfig(
null, 0,
"sequence0", "sequence0",
new SeekableStreamStartSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "2"), ImmutableSet.of()), new SeekableStreamStartSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "2"), ImmutableSet.of()),
new SeekableStreamEndSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "4")), new SeekableStreamEndSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "4")),
@ -1540,7 +1540,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport
final KinesisIndexTask task1 = createTask( final KinesisIndexTask task1 = createTask(
null, null,
new KinesisIndexTaskIOConfig( new KinesisIndexTaskIOConfig(
null, 0,
"sequence0", "sequence0",
new SeekableStreamStartSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "2"), ImmutableSet.of()), new SeekableStreamStartSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "2"), ImmutableSet.of()),
new SeekableStreamEndSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "4")), new SeekableStreamEndSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "4")),
@ -1558,7 +1558,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport
final KinesisIndexTask task2 = createTask( final KinesisIndexTask task2 = createTask(
null, null,
new KinesisIndexTaskIOConfig( new KinesisIndexTaskIOConfig(
null, 1,
"sequence1", "sequence1",
new SeekableStreamStartSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "3"), ImmutableSet.of()), new SeekableStreamStartSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "3"), ImmutableSet.of()),
new SeekableStreamEndSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "9")), new SeekableStreamEndSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "9")),
@ -1630,7 +1630,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport
final KinesisIndexTask task1 = createTask( final KinesisIndexTask task1 = createTask(
null, null,
new KinesisIndexTaskIOConfig( new KinesisIndexTaskIOConfig(
null, 0,
"sequence0", "sequence0",
new SeekableStreamStartSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "2"), ImmutableSet.of()), new SeekableStreamStartSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "2"), ImmutableSet.of()),
new SeekableStreamEndSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "4")), new SeekableStreamEndSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "4")),
@ -1648,7 +1648,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport
final KinesisIndexTask task2 = createTask( final KinesisIndexTask task2 = createTask(
null, null,
new KinesisIndexTaskIOConfig( new KinesisIndexTaskIOConfig(
null, 1,
"sequence1", "sequence1",
new SeekableStreamStartSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "3"), ImmutableSet.of()), new SeekableStreamStartSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "3"), ImmutableSet.of()),
new SeekableStreamEndSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "9")), new SeekableStreamEndSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "9")),
@ -1724,7 +1724,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport
final KinesisIndexTask task = createTask( final KinesisIndexTask task = createTask(
null, null,
new KinesisIndexTaskIOConfig( new KinesisIndexTaskIOConfig(
null, 0,
"sequence1", "sequence1",
new SeekableStreamStartSequenceNumbers<>( new SeekableStreamStartSequenceNumbers<>(
stream, stream,
@ -1808,7 +1808,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport
final KinesisIndexTask task1 = createTask( final KinesisIndexTask task1 = createTask(
null, null,
new KinesisIndexTaskIOConfig( new KinesisIndexTaskIOConfig(
null, 0,
"sequence0", "sequence0",
new SeekableStreamStartSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "2"), ImmutableSet.of()), new SeekableStreamStartSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "2"), ImmutableSet.of()),
new SeekableStreamEndSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "4")), new SeekableStreamEndSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "4")),
@ -1826,7 +1826,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport
final KinesisIndexTask task2 = createTask( final KinesisIndexTask task2 = createTask(
null, null,
new KinesisIndexTaskIOConfig( new KinesisIndexTaskIOConfig(
null, 1,
"sequence1", "sequence1",
new SeekableStreamStartSequenceNumbers<>(stream, ImmutableMap.of(shardId0, "0"), ImmutableSet.of()), new SeekableStreamStartSequenceNumbers<>(stream, ImmutableMap.of(shardId0, "0"), ImmutableSet.of()),
new SeekableStreamEndSequenceNumbers<>(stream, ImmutableMap.of(shardId0, "1")), new SeekableStreamEndSequenceNumbers<>(stream, ImmutableMap.of(shardId0, "1")),
@ -1901,7 +1901,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport
final KinesisIndexTask task1 = createTask( final KinesisIndexTask task1 = createTask(
"task1", "task1",
new KinesisIndexTaskIOConfig( new KinesisIndexTaskIOConfig(
null, 0,
"sequence0", "sequence0",
new SeekableStreamStartSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "2"), ImmutableSet.of()), new SeekableStreamStartSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "2"), ImmutableSet.of()),
new SeekableStreamEndSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "5")), new SeekableStreamEndSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "5")),
@ -1950,7 +1950,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport
final KinesisIndexTask task2 = createTask( final KinesisIndexTask task2 = createTask(
task1.getId(), task1.getId(),
new KinesisIndexTaskIOConfig( new KinesisIndexTaskIOConfig(
null, 0,
"sequence0", "sequence0",
new SeekableStreamStartSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "2"), ImmutableSet.of()), new SeekableStreamStartSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "2"), ImmutableSet.of()),
new SeekableStreamEndSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "5")), new SeekableStreamEndSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "5")),
@ -2034,7 +2034,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport
final KinesisIndexTask task1 = createTask( final KinesisIndexTask task1 = createTask(
"task1", "task1",
new KinesisIndexTaskIOConfig( new KinesisIndexTaskIOConfig(
null, 0,
"sequence0", "sequence0",
new SeekableStreamStartSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "0"), ImmutableSet.of()), new SeekableStreamStartSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "0"), ImmutableSet.of()),
new SeekableStreamEndSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "6")), new SeekableStreamEndSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "6")),
@ -2095,7 +2095,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport
final KinesisIndexTask task2 = createTask( final KinesisIndexTask task2 = createTask(
task1.getId(), task1.getId(),
new KinesisIndexTaskIOConfig( new KinesisIndexTaskIOConfig(
null, 0,
"sequence0", "sequence0",
new SeekableStreamStartSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "0"), ImmutableSet.of()), new SeekableStreamStartSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "0"), ImmutableSet.of()),
new SeekableStreamEndSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "6")), new SeekableStreamEndSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "6")),
@ -2160,7 +2160,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport
final KinesisIndexTask task = createTask( final KinesisIndexTask task = createTask(
"task1", "task1",
new KinesisIndexTaskIOConfig( new KinesisIndexTaskIOConfig(
null, 0,
"sequence0", "sequence0",
new SeekableStreamStartSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "2"), ImmutableSet.of()), new SeekableStreamStartSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "2"), ImmutableSet.of()),
new SeekableStreamEndSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "13")), new SeekableStreamEndSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "13")),
@ -2283,7 +2283,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport
"task1", "task1",
DATA_SCHEMA, DATA_SCHEMA,
new KinesisIndexTaskIOConfig( new KinesisIndexTaskIOConfig(
null, 0,
"sequence0", "sequence0",
new SeekableStreamStartSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "0"), ImmutableSet.of()), new SeekableStreamStartSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "0"), ImmutableSet.of()),
new SeekableStreamEndSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "4")), new SeekableStreamEndSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "4")),
@ -2380,7 +2380,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport
ImmutableMap.of(shardId1, "100") // simulating unlimited ImmutableMap.of(shardId1, "100") // simulating unlimited
); );
final KinesisIndexTaskIOConfig ioConfig = new KinesisIndexTaskIOConfig( final KinesisIndexTaskIOConfig ioConfig = new KinesisIndexTaskIOConfig(
null, 0,
baseSequenceName, baseSequenceName,
startPartitions, startPartitions,
endPartitions, endPartitions,
@ -2493,7 +2493,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport
"task1", "task1",
DATA_SCHEMA, DATA_SCHEMA,
new KinesisIndexTaskIOConfig( new KinesisIndexTaskIOConfig(
null, 0,
"sequence0", "sequence0",
new SeekableStreamStartSequenceNumbers<>( new SeekableStreamStartSequenceNumbers<>(
stream, stream,

View File

@ -3550,7 +3550,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
getDataSchema(dataSource), getDataSchema(dataSource),
tuningConfig, tuningConfig,
new KinesisIndexTaskIOConfig( new KinesisIndexTaskIOConfig(
null, 0,
"sequenceName-" + taskGroupId, "sequenceName-" + taskGroupId,
startPartitions, startPartitions,
endPartitions, endPartitions,