Kinesis Input Format for timestamp, and payload parsing (#16813)

* SQL syntax error should target USER persona

* * revert change to queryHandler and related tests, based on review comments

* * add test

* Introduce KinesisRecordEntity to support Kinesis headers in InputFormats

* * add kinesisInputFormat and Reader, and tests

* * bind KinesisInputFormat class to module

* * improve test coverage

* * remove references to kafka

* * resolve review comments

* * remove comment

* * fix grammer of comment

* * fix comment again

* * fix comment again

* * more review comments

* * add partitionKey

* * add check for same timestamp and partitionKey column name

* * fix intellij inspection
This commit is contained in:
zachjsh 2024-08-02 08:48:44 -04:00 committed by GitHub
parent 63ba5a4113
commit 9b731e8f0a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
14 changed files with 1534 additions and 121 deletions

View File

@ -33,8 +33,6 @@ import org.apache.kafka.clients.consumer.ConsumerRecord;
* key, and timestamp. * key, and timestamp.
* <p> * <p>
* NOTE: Any records with null values will be skipped, even if they contain non-null keys, or headers * NOTE: Any records with null values will be skipped, even if they contain non-null keys, or headers
* <p>
* This functionality is not yet exposed through any built-in InputFormats, but is available for use in extensions.
*/ */
public class KafkaRecordEntity extends ByteEntity public class KafkaRecordEntity extends ByteEntity
{ {

View File

@ -0,0 +1,157 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.data.input.kinesis;
import com.amazonaws.services.kinesis.model.Record;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import org.apache.druid.data.input.InputEntity;
import org.apache.druid.data.input.InputEntityReader;
import org.apache.druid.data.input.InputFormat;
import org.apache.druid.data.input.InputRowSchema;
import org.apache.druid.data.input.impl.JsonInputFormat;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.indexing.seekablestream.SettableByteEntity;
import org.apache.druid.java.util.common.DateTimes;
import javax.annotation.Nullable;
import java.io.File;
import java.util.Objects;
/**
* Kinesis aware InputFormat. Allows for reading kinesis specific values that are stored in the {@link Record}. At
* this time, this input format only supports reading data from the following record components
* <p>
* - {@link Record#data}
* - {@link Record#approximateArrivalTimestamp}
* - {@link Record#partitionKey}
* <p>
* This class can be extended easily to read other fields available in the kinesis record.
*/
public class KinesisInputFormat implements InputFormat
{
private static final String DEFAULT_TIMESTAMP_COLUMN_NAME = "kinesis.timestamp";
private static final String DEFAULT_PARTITION_KEY_COLUMN_NAME = "kinesis.partitionKey";
// Since KinesisInputFormat blends data from record properties, and payload, timestamp spec can be pointing to an
// attribute within one of these 2 sections. To handle scenarios where there is no timestamp value in the payload, we
// induce an artificial timestamp value to avoid unnecessary parser barf out. Users in such situations can use the
// inputFormat's kinesis record timestamp as its primary timestamp.
public static final String DEFAULT_AUTO_TIMESTAMP_STRING = "__kif_auto_timestamp";
private final TimestampSpec dummyTimestampSpec = new TimestampSpec(DEFAULT_AUTO_TIMESTAMP_STRING, "auto", DateTimes.EPOCH);
private final InputFormat valueFormat;
private final String timestampColumnName;
private final String partitionKeyColumnName;
public KinesisInputFormat(
@JsonProperty("valueFormat") InputFormat valueFormat,
@JsonProperty("partitionKeyColumnName") @Nullable String partitionKeyColumnName,
@JsonProperty("timestampColumnName") @Nullable String timestampColumnName
)
{
this.valueFormat = Preconditions.checkNotNull(valueFormat, "valueFormat must not be null");
Preconditions.checkState(
!(timestampColumnName != null && timestampColumnName.equals(partitionKeyColumnName)),
"timestampColumnName and partitionKeyColumnName must be different"
);
this.partitionKeyColumnName = partitionKeyColumnName != null
? partitionKeyColumnName
: DEFAULT_PARTITION_KEY_COLUMN_NAME;
this.timestampColumnName = timestampColumnName != null ? timestampColumnName : DEFAULT_TIMESTAMP_COLUMN_NAME;
}
@Override
public boolean isSplittable()
{
return false;
}
@Override
public InputEntityReader createReader(InputRowSchema inputRowSchema, InputEntity source, File temporaryDirectory)
{
final SettableByteEntity<KinesisRecordEntity> settableByteEntitySource;
if (source instanceof SettableByteEntity) {
settableByteEntitySource = (SettableByteEntity<KinesisRecordEntity>) source;
} else {
settableByteEntitySource = new SettableByteEntity<>();
settableByteEntitySource.setEntity((KinesisRecordEntity) source);
}
InputRowSchema newInputRowSchema = new InputRowSchema(
dummyTimestampSpec,
inputRowSchema.getDimensionsSpec(),
inputRowSchema.getColumnsFilter(),
inputRowSchema.getMetricNames()
);
return new KinesisInputReader(
inputRowSchema,
settableByteEntitySource,
JsonInputFormat.withLineSplittable(valueFormat, false).createReader(
newInputRowSchema,
source,
temporaryDirectory
),
partitionKeyColumnName,
timestampColumnName
);
}
@JsonProperty
public InputFormat getValueFormat()
{
return valueFormat;
}
@Nullable
@JsonProperty
public String getTimestampColumnName()
{
return timestampColumnName;
}
@Nullable
@JsonProperty
public String getPartitionKeyColumnName()
{
return partitionKeyColumnName;
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
KinesisInputFormat that = (KinesisInputFormat) o;
return Objects.equals(valueFormat, that.valueFormat)
&& Objects.equals(timestampColumnName, that.timestampColumnName)
&& Objects.equals(partitionKeyColumnName, that.partitionKeyColumnName);
}
@Override
public int hashCode()
{
return Objects.hash(valueFormat, timestampColumnName, partitionKeyColumnName);
}
}

View File

@ -0,0 +1,256 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.data.input.kinesis;
import com.google.common.collect.Lists;
import org.apache.druid.data.input.InputEntityReader;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.InputRowListPlusRawValues;
import org.apache.druid.data.input.InputRowSchema;
import org.apache.druid.data.input.MapBasedInputRow;
import org.apache.druid.data.input.impl.MapInputRowParser;
import org.apache.druid.indexing.seekablestream.SettableByteEntity;
import org.apache.druid.java.util.common.CloseableIterators;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.joda.time.DateTime;
import java.io.IOException;
import java.util.AbstractMap;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
public class KinesisInputReader implements InputEntityReader
{
private final InputRowSchema inputRowSchema;
private final SettableByteEntity<KinesisRecordEntity> source;
private final InputEntityReader valueParser;
private final String partitionKeyColumnName;
private final String timestampColumnName;
public KinesisInputReader(
InputRowSchema inputRowSchema,
SettableByteEntity<KinesisRecordEntity> source,
InputEntityReader valueParser,
String partitionKeyColumnName,
String timestampColumnName
)
{
this.inputRowSchema = inputRowSchema;
this.source = source;
this.valueParser = valueParser;
this.partitionKeyColumnName = partitionKeyColumnName;
this.timestampColumnName = timestampColumnName;
}
@Override
public CloseableIterator<InputRow> read() throws IOException
{
final KinesisRecordEntity record = source.getEntity();
final Map<String, Object> mergedHeaderMap = extractHeaders(record);
if (record.getRecord().getData() != null) {
return buildBlendedRows(valueParser, mergedHeaderMap);
} else {
return CloseableIterators.withEmptyBaggage(buildInputRowsForMap(mergedHeaderMap).iterator());
}
}
@Override
public CloseableIterator<InputRowListPlusRawValues> sample() throws IOException
{
final KinesisRecordEntity record = source.getEntity();
InputRowListPlusRawValues headers = extractHeaderSample(record);
if (record.getRecord().getData() != null) {
return buildBlendedRowsSample(valueParser, headers.getRawValues());
} else {
final List<InputRowListPlusRawValues> rows = Collections.singletonList(headers);
return CloseableIterators.withEmptyBaggage(rows.iterator());
}
}
private Map<String, Object> extractHeaders(KinesisRecordEntity record)
{
final Map<String, Object> mergedHeaderMap = new HashMap<>();
mergedHeaderMap.put(timestampColumnName, record.getRecord().getApproximateArrivalTimestamp().getTime());
mergedHeaderMap.put(partitionKeyColumnName, record.getRecord().getPartitionKey());
return mergedHeaderMap;
}
private CloseableIterator<InputRow> buildBlendedRows(
InputEntityReader valueParser,
Map<String, Object> headerKeyList
) throws IOException
{
return valueParser.read().map(
r -> {
final HashSet<String> newDimensions = new HashSet<>(r.getDimensions());
final Map<String, Object> event = buildBlendedEventMap(r::getRaw, newDimensions, headerKeyList);
newDimensions.addAll(headerKeyList.keySet());
// Remove the dummy timestamp added in KinesisInputFormat
newDimensions.remove(KinesisInputFormat.DEFAULT_AUTO_TIMESTAMP_STRING);
final DateTime timestamp = MapInputRowParser.parseTimestamp(inputRowSchema.getTimestampSpec(), event);
return new MapBasedInputRow(
timestamp,
MapInputRowParser.findDimensions(
inputRowSchema.getTimestampSpec(),
inputRowSchema.getDimensionsSpec(),
newDimensions
),
event
);
}
);
}
private InputRowListPlusRawValues extractHeaderSample(KinesisRecordEntity record)
{
Map<String, Object> mergedHeaderMap = extractHeaders(record);
return InputRowListPlusRawValues.of(buildInputRowsForMap(mergedHeaderMap), mergedHeaderMap);
}
private CloseableIterator<InputRowListPlusRawValues> buildBlendedRowsSample(
InputEntityReader valueParser,
Map<String, Object> headerKeyList
) throws IOException
{
return valueParser.sample().map(
rowAndValues -> {
if (rowAndValues.getParseException() != null) {
return rowAndValues;
}
List<InputRow> newInputRows = Lists.newArrayListWithCapacity(rowAndValues.getInputRows().size());
List<Map<String, Object>> newRawRows = Lists.newArrayListWithCapacity(rowAndValues.getRawValues().size());
for (Map<String, Object> raw : rowAndValues.getRawValuesList()) {
newRawRows.add(buildBlendedEventMap(raw::get, raw.keySet(), headerKeyList));
}
for (InputRow r : rowAndValues.getInputRows()) {
if (r != null) {
final HashSet<String> newDimensions = new HashSet<>(r.getDimensions());
final Map<String, Object> event = buildBlendedEventMap(
r::getRaw,
newDimensions,
headerKeyList
);
newDimensions.addAll(headerKeyList.keySet());
// Remove the dummy timestamp added in KinesisInputFormat
newDimensions.remove(KinesisInputFormat.DEFAULT_AUTO_TIMESTAMP_STRING);
newInputRows.add(
new MapBasedInputRow(
inputRowSchema.getTimestampSpec().extractTimestamp(event),
MapInputRowParser.findDimensions(
inputRowSchema.getTimestampSpec(),
inputRowSchema.getDimensionsSpec(),
newDimensions
),
event
)
);
}
}
return InputRowListPlusRawValues.ofList(newRawRows, newInputRows, null);
}
);
}
private List<InputRow> buildInputRowsForMap(Map<String, Object> headerKeyList)
{
return Collections.singletonList(
new MapBasedInputRow(
inputRowSchema.getTimestampSpec().extractTimestamp(headerKeyList),
MapInputRowParser.findDimensions(
inputRowSchema.getTimestampSpec(),
inputRowSchema.getDimensionsSpec(),
headerKeyList.keySet()
),
headerKeyList
)
);
}
private Map<String, Object> buildBlendedEventMap(
Function<String, Object> getRowValue,
Set<String> rowDimensions,
Map<String, Object> fallback
)
{
final Set<String> keySet = new HashSet<>(fallback.keySet());
keySet.addAll(rowDimensions);
return new AbstractMap<String, Object>()
{
@Override
public Object get(Object key)
{
final String skey = (String) key;
final Object val = getRowValue.apply(skey);
if (val == null) {
return fallback.get(skey);
}
return val;
}
@Override
public Set<String> keySet()
{
return keySet;
}
@Override
public Set<Entry<String, Object>> entrySet()
{
return keySet().stream()
.map(
field -> new Entry<String, Object>()
{
@Override
public String getKey()
{
return field;
}
@Override
public Object getValue()
{
return get(field);
}
@Override
public Object setValue(final Object value)
{
throw new UnsupportedOperationException();
}
}
)
.collect(Collectors.toCollection(LinkedHashSet::new));
}
};
}
}

View File

@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.data.input.kinesis;
import com.amazonaws.services.kinesis.model.Record;
import org.apache.druid.data.input.InputFormat;
import org.apache.druid.data.input.impl.ByteEntity;
import org.apache.druid.indexing.kinesis.KinesisRecordSupplier;
/**
* A {@link ByteEntity} generated by {@link KinesisRecordSupplier} and fed to any {@link InputFormat} used by kinesis
* indexing tasks.
* <p>
* It can be used as a regular ByteEntity, in which case the kinesis record value is returned, but the {@link #getRecord}
* method also allows Kinesis-aware {@link InputFormat} implementations to read the full kinesis record, including
* timestamp, encrytion key, patition key, and sequence number
* <p>
* NOTE: Any records with null values will be returned as records with just only kinesis properties and no data payload
*/
public class KinesisRecordEntity extends ByteEntity
{
private final Record record;
public KinesisRecordEntity(Record record)
{
super(record.getData());
this.record = record;
}
public Record getRecord()
{
return record;
}
}

View File

@ -27,7 +27,7 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.inject.name.Named; import com.google.inject.name.Named;
import org.apache.druid.common.aws.AWSCredentialsConfig; import org.apache.druid.common.aws.AWSCredentialsConfig;
import org.apache.druid.data.input.impl.ByteEntity; import org.apache.druid.data.input.kinesis.KinesisRecordEntity;
import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.TaskToolbox; import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.task.TaskResource; import org.apache.druid.indexing.common.task.TaskResource;
@ -46,7 +46,7 @@ import java.util.Collections;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
public class KinesisIndexTask extends SeekableStreamIndexTask<String, String, ByteEntity> public class KinesisIndexTask extends SeekableStreamIndexTask<String, String, KinesisRecordEntity>
{ {
private static final String TYPE = "index_kinesis"; private static final String TYPE = "index_kinesis";
@ -100,7 +100,7 @@ public class KinesisIndexTask extends SeekableStreamIndexTask<String, String, By
} }
@Override @Override
protected SeekableStreamIndexTaskRunner<String, String, ByteEntity> createTaskRunner() protected SeekableStreamIndexTaskRunner<String, String, KinesisRecordEntity> createTaskRunner()
{ {
//noinspection unchecked //noinspection unchecked
return new KinesisIndexTaskRunner( return new KinesisIndexTaskRunner(

View File

@ -21,8 +21,8 @@ package org.apache.druid.indexing.kinesis;
import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.druid.data.input.impl.ByteEntity;
import org.apache.druid.data.input.impl.InputRowParser; import org.apache.druid.data.input.impl.InputRowParser;
import org.apache.druid.data.input.kinesis.KinesisRecordEntity;
import org.apache.druid.indexing.common.LockGranularity; import org.apache.druid.indexing.common.LockGranularity;
import org.apache.druid.indexing.common.TaskToolbox; import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.seekablestream.SeekableStreamDataSourceMetadata; import org.apache.druid.indexing.seekablestream.SeekableStreamDataSourceMetadata;
@ -49,7 +49,7 @@ import java.util.Set;
import java.util.TreeMap; import java.util.TreeMap;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
public class KinesisIndexTaskRunner extends SeekableStreamIndexTaskRunner<String, String, ByteEntity> public class KinesisIndexTaskRunner extends SeekableStreamIndexTaskRunner<String, String, KinesisRecordEntity>
{ {
private static final EmittingLogger log = new EmittingLogger(KinesisIndexTaskRunner.class); private static final EmittingLogger log = new EmittingLogger(KinesisIndexTaskRunner.class);
private static final long POLL_TIMEOUT = 100; private static final long POLL_TIMEOUT = 100;
@ -81,8 +81,8 @@ public class KinesisIndexTaskRunner extends SeekableStreamIndexTaskRunner<String
@Nonnull @Nonnull
@Override @Override
protected List<OrderedPartitionableRecord<String, String, ByteEntity>> getRecords( protected List<OrderedPartitionableRecord<String, String, KinesisRecordEntity>> getRecords(
RecordSupplier<String, String, ByteEntity> recordSupplier, TaskToolbox toolbox RecordSupplier<String, String, KinesisRecordEntity> recordSupplier, TaskToolbox toolbox
) )
{ {
return recordSupplier.poll(POLL_TIMEOUT); return recordSupplier.poll(POLL_TIMEOUT);
@ -119,7 +119,7 @@ public class KinesisIndexTaskRunner extends SeekableStreamIndexTaskRunner<String
@Override @Override
protected void possiblyResetDataSourceMetadata( protected void possiblyResetDataSourceMetadata(
TaskToolbox toolbox, TaskToolbox toolbox,
RecordSupplier<String, String, ByteEntity> recordSupplier, RecordSupplier<String, String, KinesisRecordEntity> recordSupplier,
Set<StreamPartition<String>> assignment Set<StreamPartition<String>> assignment
) )
{ {

View File

@ -26,6 +26,7 @@ import com.google.common.collect.ImmutableList;
import com.google.inject.Binder; import com.google.inject.Binder;
import com.google.inject.name.Names; import com.google.inject.name.Names;
import org.apache.druid.common.aws.AWSCredentialsConfig; import org.apache.druid.common.aws.AWSCredentialsConfig;
import org.apache.druid.data.input.kinesis.KinesisInputFormat;
import org.apache.druid.guice.JsonConfigProvider; import org.apache.druid.guice.JsonConfigProvider;
import org.apache.druid.indexing.kinesis.supervisor.KinesisSupervisorSpec; import org.apache.druid.indexing.kinesis.supervisor.KinesisSupervisorSpec;
import org.apache.druid.indexing.kinesis.supervisor.KinesisSupervisorTuningConfig; import org.apache.druid.indexing.kinesis.supervisor.KinesisSupervisorTuningConfig;
@ -50,7 +51,8 @@ public class KinesisIndexingServiceModule implements DruidModule
new NamedType(KinesisIndexTaskIOConfig.class, SCHEME), new NamedType(KinesisIndexTaskIOConfig.class, SCHEME),
new NamedType(KinesisSupervisorTuningConfig.class, SCHEME), new NamedType(KinesisSupervisorTuningConfig.class, SCHEME),
new NamedType(KinesisSupervisorSpec.class, SCHEME), new NamedType(KinesisSupervisorSpec.class, SCHEME),
new NamedType(KinesisSamplerSpec.class, SCHEME) new NamedType(KinesisSamplerSpec.class, SCHEME),
new NamedType(KinesisInputFormat.class, SCHEME)
) )
); );
} }

View File

@ -26,6 +26,7 @@ import com.amazonaws.auth.STSAssumeRoleSessionCredentialsProvider;
import com.amazonaws.client.builder.AwsClientBuilder; import com.amazonaws.client.builder.AwsClientBuilder;
import com.amazonaws.services.kinesis.AmazonKinesis; import com.amazonaws.services.kinesis.AmazonKinesis;
import com.amazonaws.services.kinesis.AmazonKinesisClientBuilder; import com.amazonaws.services.kinesis.AmazonKinesisClientBuilder;
import com.amazonaws.services.kinesis.clientlibrary.types.UserRecord;
import com.amazonaws.services.kinesis.model.DescribeStreamRequest; import com.amazonaws.services.kinesis.model.DescribeStreamRequest;
import com.amazonaws.services.kinesis.model.ExpiredIteratorException; import com.amazonaws.services.kinesis.model.ExpiredIteratorException;
import com.amazonaws.services.kinesis.model.GetRecordsRequest; import com.amazonaws.services.kinesis.model.GetRecordsRequest;
@ -49,7 +50,7 @@ import com.google.common.collect.Maps;
import org.apache.druid.common.aws.AWSClientUtil; import org.apache.druid.common.aws.AWSClientUtil;
import org.apache.druid.common.aws.AWSCredentialsConfig; import org.apache.druid.common.aws.AWSCredentialsConfig;
import org.apache.druid.common.aws.AWSCredentialsUtils; import org.apache.druid.common.aws.AWSCredentialsUtils;
import org.apache.druid.data.input.impl.ByteEntity; import org.apache.druid.data.input.kinesis.KinesisRecordEntity;
import org.apache.druid.error.DruidException; import org.apache.druid.error.DruidException;
import org.apache.druid.indexing.kinesis.supervisor.KinesisSupervisor; import org.apache.druid.indexing.kinesis.supervisor.KinesisSupervisor;
import org.apache.druid.indexing.seekablestream.common.OrderedPartitionableRecord; import org.apache.druid.indexing.seekablestream.common.OrderedPartitionableRecord;
@ -69,7 +70,6 @@ import javax.annotation.Nullable;
import java.lang.invoke.MethodHandle; import java.lang.invoke.MethodHandle;
import java.lang.invoke.MethodHandles; import java.lang.invoke.MethodHandles;
import java.lang.reflect.Method; import java.lang.reflect.Method;
import java.nio.ByteBuffer;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
@ -94,7 +94,7 @@ import java.util.stream.Collectors;
* This class implements a local buffer for storing fetched Kinesis records. Fetching is done * This class implements a local buffer for storing fetched Kinesis records. Fetching is done
* in background threads. * in background threads.
*/ */
public class KinesisRecordSupplier implements RecordSupplier<String, String, ByteEntity> public class KinesisRecordSupplier implements RecordSupplier<String, String, KinesisRecordEntity>
{ {
private static final EmittingLogger log = new EmittingLogger(KinesisRecordSupplier.class); private static final EmittingLogger log = new EmittingLogger(KinesisRecordSupplier.class);
private static final long PROVISIONED_THROUGHPUT_EXCEEDED_BACKOFF_MS = 3000; private static final long PROVISIONED_THROUGHPUT_EXCEEDED_BACKOFF_MS = 3000;
@ -210,7 +210,7 @@ public class KinesisRecordSupplier implements RecordSupplier<String, String, Byt
// used for retrying on InterruptedException // used for retrying on InterruptedException
GetRecordsResult recordsResult = null; GetRecordsResult recordsResult = null;
OrderedPartitionableRecord<String, String, ByteEntity> currRecord; OrderedPartitionableRecord<String, String, KinesisRecordEntity> currRecord;
long recordBufferOfferWaitMillis; long recordBufferOfferWaitMillis;
try { try {
@ -248,7 +248,7 @@ public class KinesisRecordSupplier implements RecordSupplier<String, String, Byt
// list will come back empty if there are no records // list will come back empty if there are no records
for (Record kinesisRecord : recordsResult.getRecords()) { for (Record kinesisRecord : recordsResult.getRecords()) {
final List<ByteEntity> data; final List<KinesisRecordEntity> data;
if (deaggregateHandle == null || getDataHandle == null) { if (deaggregateHandle == null || getDataHandle == null) {
throw new ISE("deaggregateHandle or getDataHandle is null!"); throw new ISE("deaggregateHandle or getDataHandle is null!");
@ -256,15 +256,15 @@ public class KinesisRecordSupplier implements RecordSupplier<String, String, Byt
data = new ArrayList<>(); data = new ArrayList<>();
final List userRecords = (List) deaggregateHandle.invokeExact( final List<UserRecord> userRecords = (List<UserRecord>) deaggregateHandle.invokeExact(
Collections.singletonList(kinesisRecord) Collections.singletonList(kinesisRecord)
); );
int recordSize = 0; int recordSize = 0;
for (Object userRecord : userRecords) { for (UserRecord userRecord : userRecords) {
ByteEntity byteEntity = new ByteEntity((ByteBuffer) getDataHandle.invoke(userRecord)); KinesisRecordEntity kinesisRecordEntity = new KinesisRecordEntity(userRecord);
recordSize += byteEntity.getBuffer().array().length; recordSize += kinesisRecordEntity.getBuffer().array().length;
data.add(byteEntity); data.add(kinesisRecordEntity);
} }
@ -408,7 +408,7 @@ public class KinesisRecordSupplier implements RecordSupplier<String, String, Byt
private final ConcurrentMap<StreamPartition<String>, PartitionResource> partitionResources = private final ConcurrentMap<StreamPartition<String>, PartitionResource> partitionResources =
new ConcurrentHashMap<>(); new ConcurrentHashMap<>();
private MemoryBoundLinkedBlockingQueue<OrderedPartitionableRecord<String, String, ByteEntity>> records; private MemoryBoundLinkedBlockingQueue<OrderedPartitionableRecord<String, String, KinesisRecordEntity>> records;
private final boolean backgroundFetchEnabled; private final boolean backgroundFetchEnabled;
private volatile boolean closed = false; private volatile boolean closed = false;
@ -615,12 +615,12 @@ public class KinesisRecordSupplier implements RecordSupplier<String, String, Byt
@Nonnull @Nonnull
@Override @Override
public List<OrderedPartitionableRecord<String, String, ByteEntity>> poll(long timeout) public List<OrderedPartitionableRecord<String, String, KinesisRecordEntity>> poll(long timeout)
{ {
start(); start();
try { try {
List<MemoryBoundLinkedBlockingQueue.ObjectContainer<OrderedPartitionableRecord<String, String, ByteEntity>>> polledRecords = new ArrayList<>(); List<MemoryBoundLinkedBlockingQueue.ObjectContainer<OrderedPartitionableRecord<String, String, KinesisRecordEntity>>> polledRecords = new ArrayList<>();
records.drain( records.drain(
polledRecords, polledRecords,
@ -1040,7 +1040,7 @@ public class KinesisRecordSupplier implements RecordSupplier<String, String, Byt
} }
// filter records in buffer and only retain ones whose partition was not seeked // filter records in buffer and only retain ones whose partition was not seeked
MemoryBoundLinkedBlockingQueue<OrderedPartitionableRecord<String, String, ByteEntity>> newQ = MemoryBoundLinkedBlockingQueue<OrderedPartitionableRecord<String, String, KinesisRecordEntity>> newQ =
new MemoryBoundLinkedBlockingQueue<>(recordBufferSizeBytes); new MemoryBoundLinkedBlockingQueue<>(recordBufferSizeBytes);
records.stream() records.stream()

View File

@ -25,7 +25,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;
import org.apache.druid.common.aws.AWSCredentialsConfig; import org.apache.druid.common.aws.AWSCredentialsConfig;
import org.apache.druid.common.utils.IdUtils; import org.apache.druid.common.utils.IdUtils;
import org.apache.druid.data.input.impl.ByteEntity; import org.apache.druid.data.input.kinesis.KinesisRecordEntity;
import org.apache.druid.indexing.common.task.Task; import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.common.task.TaskResource; import org.apache.druid.indexing.common.task.TaskResource;
import org.apache.druid.indexing.kinesis.KinesisDataSourceMetadata; import org.apache.druid.indexing.kinesis.KinesisDataSourceMetadata;
@ -74,7 +74,7 @@ import java.util.stream.Collectors;
* tasks to satisfy the desired number of replicas. As tasks complete, new tasks are queued to process the next range of * tasks to satisfy the desired number of replicas. As tasks complete, new tasks are queued to process the next range of
* Kinesis sequences. * Kinesis sequences.
*/ */
public class KinesisSupervisor extends SeekableStreamSupervisor<String, String, ByteEntity> public class KinesisSupervisor extends SeekableStreamSupervisor<String, String, KinesisRecordEntity>
{ {
private static final EmittingLogger log = new EmittingLogger(KinesisSupervisor.class); private static final EmittingLogger log = new EmittingLogger(KinesisSupervisor.class);
@ -150,7 +150,7 @@ public class KinesisSupervisor extends SeekableStreamSupervisor<String, String,
} }
@Override @Override
protected List<SeekableStreamIndexTask<String, String, ByteEntity>> createIndexTasks( protected List<SeekableStreamIndexTask<String, String, KinesisRecordEntity>> createIndexTasks(
int replicas, int replicas,
String baseSequenceName, String baseSequenceName,
ObjectMapper sortingMapper, ObjectMapper sortingMapper,
@ -164,7 +164,7 @@ public class KinesisSupervisor extends SeekableStreamSupervisor<String, String,
final Map<String, Object> context = createBaseTaskContexts(); final Map<String, Object> context = createBaseTaskContexts();
context.put(CHECKPOINTS_CTX_KEY, checkpoints); context.put(CHECKPOINTS_CTX_KEY, checkpoints);
List<SeekableStreamIndexTask<String, String, ByteEntity>> taskList = new ArrayList<>(); List<SeekableStreamIndexTask<String, String, KinesisRecordEntity>> taskList = new ArrayList<>();
for (int i = 0; i < replicas; i++) { for (int i = 0; i < replicas; i++) {
String taskId = IdUtils.getRandomIdWithPrefix(baseSequenceName); String taskId = IdUtils.getRandomIdWithPrefix(baseSequenceName);
taskList.add(new KinesisIndexTask( taskList.add(new KinesisIndexTask(
@ -183,7 +183,7 @@ public class KinesisSupervisor extends SeekableStreamSupervisor<String, String,
@Override @Override
protected RecordSupplier<String, String, ByteEntity> setupRecordSupplier() throws RuntimeException protected RecordSupplier<String, String, KinesisRecordEntity> setupRecordSupplier() throws RuntimeException
{ {
KinesisSupervisorIOConfig ioConfig = spec.getIoConfig(); KinesisSupervisorIOConfig ioConfig = spec.getIoConfig();
KinesisIndexTaskTuningConfig taskTuningConfig = spec.getTuningConfig(); KinesisIndexTaskTuningConfig taskTuningConfig = spec.getTuningConfig();

View File

@ -0,0 +1,940 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.data.input.kinesis;
import com.amazonaws.services.kinesis.model.Record;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.data.input.ColumnsFilter;
import org.apache.druid.data.input.InputEntityReader;
import org.apache.druid.data.input.InputFormat;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.InputRowListPlusRawValues;
import org.apache.druid.data.input.InputRowSchema;
import org.apache.druid.data.input.impl.CsvInputFormat;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.JsonInputFormat;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.indexing.common.TestUtils;
import org.apache.druid.indexing.seekablestream.SettableByteEntity;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.apache.druid.java.util.common.parsers.JSONPathFieldSpec;
import org.apache.druid.java.util.common.parsers.JSONPathFieldType;
import org.apache.druid.java.util.common.parsers.JSONPathSpec;
import org.apache.druid.java.util.common.parsers.ParseException;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.List;
public class KinesisInputFormatTest
{
static {
NullHandling.initializeForTests();
}
private static final String KINESIS_APPROXIMATE_TIME_DATE = "2024-07-29";
private static final long KINESIS_APPROXOIMATE_TIMESTAMP_MILLIS = DateTimes.of(KINESIS_APPROXIMATE_TIME_DATE).getMillis();
private static final String DATA_TIMSTAMP_DATE = "2024-07-30";
private static final String PARTITION_KEY = "partition_key_1";
private static final byte[] SIMPLE_JSON_VALUE_BYTES = StringUtils.toUtf8(
TestUtils.singleQuoteToStandardJson(
"{"
+ " 'timestamp': '" + DATA_TIMSTAMP_DATE + "',"
+ " 'bar': null,"
+ " 'foo': 'x',"
+ " 'baz': 4,"
+ " 'o': {'mg': 1}"
+ "}"
)
);
private KinesisInputFormat format;
@Before
public void setUp()
{
format = new KinesisInputFormat(
// Value Format
new JsonInputFormat(
new JSONPathSpec(
true,
ImmutableList.of(
new JSONPathFieldSpec(JSONPathFieldType.ROOT, "root_baz", "baz"),
new JSONPathFieldSpec(JSONPathFieldType.ROOT, "root_baz2", "baz2"),
new JSONPathFieldSpec(JSONPathFieldType.PATH, "path_omg", "$.o.mg"),
new JSONPathFieldSpec(JSONPathFieldType.PATH, "path_omg2", "$.o.mg2"),
new JSONPathFieldSpec(JSONPathFieldType.JQ, "jq_omg", ".o.mg"),
new JSONPathFieldSpec(JSONPathFieldType.JQ, "jq_omg2", ".o.mg2")
)
),
null,
null,
false,
false
),
"kinesis.newts.partitionKey",
"kinesis.newts.timestamp"
);
}
@Test
public void testSerde() throws JsonProcessingException
{
final ObjectMapper mapper = new ObjectMapper();
KinesisInputFormat kif = new KinesisInputFormat(
// Value Format
new JsonInputFormat(
new JSONPathSpec(
true,
ImmutableList.of(
new JSONPathFieldSpec(JSONPathFieldType.ROOT, "root_baz", "baz"),
new JSONPathFieldSpec(JSONPathFieldType.ROOT, "root_baz2", "baz2"),
new JSONPathFieldSpec(JSONPathFieldType.PATH, "path_omg", "$.o.mg"),
new JSONPathFieldSpec(JSONPathFieldType.PATH, "path_omg2", "$.o.mg2"),
new JSONPathFieldSpec(JSONPathFieldType.JQ, "jq_omg", ".o.mg"),
new JSONPathFieldSpec(JSONPathFieldType.JQ, "jq_omg2", ".o.mg2")
)
),
null,
null,
false,
false
),
"kinesis.newts.partitionKey",
"kinesis.newts.timestamp"
);
Assert.assertEquals(format, kif);
final byte[] formatBytes = mapper.writeValueAsBytes(format);
final byte[] kifBytes = mapper.writeValueAsBytes(kif);
Assert.assertArrayEquals(formatBytes, kifBytes);
}
@Test
public void testTimestampFromHeader() throws IOException
{
KinesisRecordEntity inputEntity = makeInputEntity(SIMPLE_JSON_VALUE_BYTES, KINESIS_APPROXOIMATE_TIMESTAMP_MILLIS);
final InputEntityReader reader = format.createReader(
new InputRowSchema(
new TimestampSpec("kinesis.newts.timestamp", "iso", null),
new DimensionsSpec(
DimensionsSpec.getDefaultSchemas(
ImmutableList.of(
"bar",
"foo"
)
)
),
ColumnsFilter.all()
),
newSettableByteEntity(inputEntity),
null
);
final int numExpectedIterations = 1;
try (CloseableIterator<InputRow> iterator = reader.read()) {
int numActualIterations = 0;
while (iterator.hasNext()) {
final InputRow row = iterator.next();
// Payload verifications
// this isn't super realistic, since most of these columns are not actually defined in the dimensionSpec
// but test reading them anyway since it isn't technically illegal
Assert.assertEquals(DateTimes.of(KINESIS_APPROXIMATE_TIME_DATE), row.getTimestamp());
Assert.assertEquals(
String.valueOf(KINESIS_APPROXOIMATE_TIMESTAMP_MILLIS),
Iterables.getOnlyElement(row.getDimension("kinesis.newts.timestamp"))
);
Assert.assertEquals(PARTITION_KEY, Iterables.getOnlyElement(row.getDimension("kinesis.newts.partitionKey")));
Assert.assertEquals("x", Iterables.getOnlyElement(row.getDimension("foo")));
Assert.assertEquals("4", Iterables.getOnlyElement(row.getDimension("baz")));
Assert.assertEquals("4", Iterables.getOnlyElement(row.getDimension("root_baz")));
Assert.assertEquals("1", Iterables.getOnlyElement(row.getDimension("path_omg")));
Assert.assertEquals("1", Iterables.getOnlyElement(row.getDimension("jq_omg")));
Assert.assertEquals(ImmutableMap.of("mg", 1L), row.getRaw("o"));
Assert.assertTrue(row.getDimension("root_baz2").isEmpty());
Assert.assertTrue(row.getDimension("path_omg2").isEmpty());
Assert.assertTrue(row.getDimension("jq_omg2").isEmpty());
Assert.assertTrue(row.getDimension("jq_omg2").isEmpty());
numActualIterations++;
}
Assert.assertEquals(numExpectedIterations, numActualIterations);
}
}
@Test
public void testRawSample() throws IOException
{
KinesisRecordEntity inputEntity = makeInputEntity(SIMPLE_JSON_VALUE_BYTES, KINESIS_APPROXOIMATE_TIMESTAMP_MILLIS);
final InputEntityReader reader = format.createReader(
new InputRowSchema(
new TimestampSpec(KinesisInputFormat.DEFAULT_AUTO_TIMESTAMP_STRING, "auto", DateTimes.EPOCH),
new DimensionsSpec(
DimensionsSpec.getDefaultSchemas(
ImmutableList.of(
"bar",
"foo"
)
)
),
ColumnsFilter.all()
),
newSettableByteEntity(inputEntity),
null
);
final int numExpectedIterations = 1;
try (CloseableIterator<InputRowListPlusRawValues> iterator = reader.sample()) {
int numActualIterations = 0;
while (iterator.hasNext()) {
final InputRowListPlusRawValues rawValues = iterator.next();
Assert.assertEquals(1, rawValues.getInputRows().size());
InputRow row = rawValues.getInputRows().get(0);
// Payload verifications
// this isn't super realistic, since most of these columns are not actually defined in the dimensionSpec
// but test reading them anyway since it isn't technically illegal
Assert.assertEquals(
String.valueOf(KINESIS_APPROXOIMATE_TIMESTAMP_MILLIS),
Iterables.getOnlyElement(row.getDimension("kinesis.newts.timestamp"))
);
Assert.assertEquals(PARTITION_KEY, Iterables.getOnlyElement(row.getDimension("kinesis.newts.partitionKey")));
Assert.assertEquals("x", Iterables.getOnlyElement(row.getDimension("foo")));
Assert.assertEquals("4", Iterables.getOnlyElement(row.getDimension("baz")));
Assert.assertEquals("4", Iterables.getOnlyElement(row.getDimension("root_baz")));
Assert.assertEquals("1", Iterables.getOnlyElement(row.getDimension("path_omg")));
Assert.assertEquals("1", Iterables.getOnlyElement(row.getDimension("jq_omg")));
Assert.assertEquals(ImmutableMap.of("mg", 1L), row.getRaw("o"));
Assert.assertTrue(row.getDimension("root_baz2").isEmpty());
Assert.assertTrue(row.getDimension("path_omg2").isEmpty());
Assert.assertTrue(row.getDimension("jq_omg2").isEmpty());
Assert.assertTrue(row.getDimension("jq_omg2").isEmpty());
numActualIterations++;
}
Assert.assertEquals(numExpectedIterations, numActualIterations);
}
}
@Test
public void testProcessesSampleTimestampFromHeader() throws IOException
{
KinesisRecordEntity inputEntity = makeInputEntity(SIMPLE_JSON_VALUE_BYTES, KINESIS_APPROXOIMATE_TIMESTAMP_MILLIS);
final InputEntityReader reader = format.createReader(
new InputRowSchema(
new TimestampSpec("kinesis.newts.timestamp", "iso", null),
new DimensionsSpec(
DimensionsSpec.getDefaultSchemas(
ImmutableList.of(
"bar",
"foo"
)
)
),
ColumnsFilter.all()
),
newSettableByteEntity(inputEntity),
null
);
final int numExpectedIterations = 1;
try (CloseableIterator<InputRowListPlusRawValues> iterator = reader.sample()) {
int numActualIterations = 0;
while (iterator.hasNext()) {
final InputRowListPlusRawValues rawValues = iterator.next();
Assert.assertEquals(1, rawValues.getInputRows().size());
InputRow row = rawValues.getInputRows().get(0);
// Payload verifications
// this isn't super realistic, since most of these columns are not actually defined in the dimensionSpec
// but test reading them anyway since it isn't technically illegal
Assert.assertEquals(DateTimes.of(String.valueOf(KINESIS_APPROXOIMATE_TIMESTAMP_MILLIS)), row.getTimestamp());
Assert.assertEquals(
String.valueOf(KINESIS_APPROXOIMATE_TIMESTAMP_MILLIS),
Iterables.getOnlyElement(row.getDimension("kinesis.newts.timestamp"))
);
Assert.assertEquals(PARTITION_KEY, Iterables.getOnlyElement(row.getDimension("kinesis.newts.partitionKey")));
Assert.assertEquals("x", Iterables.getOnlyElement(row.getDimension("foo")));
Assert.assertEquals("4", Iterables.getOnlyElement(row.getDimension("baz")));
Assert.assertEquals("4", Iterables.getOnlyElement(row.getDimension("root_baz")));
Assert.assertEquals("1", Iterables.getOnlyElement(row.getDimension("path_omg")));
Assert.assertEquals("1", Iterables.getOnlyElement(row.getDimension("jq_omg")));
Assert.assertEquals(ImmutableMap.of("mg", 1L), row.getRaw("o"));
Assert.assertTrue(row.getDimension("root_baz2").isEmpty());
Assert.assertTrue(row.getDimension("path_omg2").isEmpty());
Assert.assertTrue(row.getDimension("jq_omg2").isEmpty());
Assert.assertTrue(row.getDimension("jq_omg2").isEmpty());
numActualIterations++;
}
Assert.assertEquals(numExpectedIterations, numActualIterations);
}
}
@Test
public void testWithMultipleMixedRecordsTimestampFromHeader() throws IOException
{
final byte[][] values = new byte[5][];
for (int i = 0; i < values.length; i++) {
values[i] = StringUtils.toUtf8(
"{\n"
+ " \"timestamp\": \"2024-07-2" + i + "\",\n"
+ " \"bar\": null,\n"
+ " \"foo\": \"x\",\n"
+ " \"baz\": 4,\n"
+ " \"index\": " + i + ",\n"
+ " \"o\": {\n"
+ " \"mg\": 1\n"
+ " }\n"
+ "}"
);
}
SettableByteEntity<KinesisRecordEntity> settableByteEntity = new SettableByteEntity<>();
final InputEntityReader reader = format.createReader(
new InputRowSchema(
new TimestampSpec("kinesis.newts.timestamp", "iso", null),
new DimensionsSpec(
DimensionsSpec.getDefaultSchemas(
ImmutableList.of(
"bar",
"foo",
"kinesis.newts.timestamp"
)
)
),
ColumnsFilter.all()
),
settableByteEntity,
null
);
for (int i = 0; i < values.length; i++) {
KinesisRecordEntity inputEntity = makeInputEntity(values[i], DateTimes.of("2024-07-1" + i).getMillis());
settableByteEntity.setEntity(inputEntity);
final int numExpectedIterations = 1;
try (CloseableIterator<InputRow> iterator = reader.read()) {
int numActualIterations = 0;
while (iterator.hasNext()) {
final InputRow row = iterator.next();
// Payload verification
// this isn't super realistic, since most of these columns are not actually defined in the dimensionSpec
// but test reading them anyway since it isn't technically illegal
Assert.assertEquals(DateTimes.of("2024-07-1" + i), row.getTimestamp());
Assert.assertEquals(
String.valueOf(DateTimes.of("2024-07-1" + i).getMillis()),
Iterables.getOnlyElement(row.getDimension("kinesis.newts.timestamp"))
);
Assert.assertEquals(PARTITION_KEY, Iterables.getOnlyElement(row.getDimension("kinesis.newts.partitionKey")));
Assert.assertEquals("x", Iterables.getOnlyElement(row.getDimension("foo")));
Assert.assertEquals("4", Iterables.getOnlyElement(row.getDimension("baz")));
Assert.assertEquals("4", Iterables.getOnlyElement(row.getDimension("root_baz")));
Assert.assertEquals("1", Iterables.getOnlyElement(row.getDimension("path_omg")));
Assert.assertEquals("1", Iterables.getOnlyElement(row.getDimension("jq_omg")));
Assert.assertEquals(ImmutableMap.of("mg", 1L), row.getRaw("o"));
Assert.assertEquals(String.valueOf(i), Iterables.getOnlyElement(row.getDimension("index")));
Assert.assertTrue(row.getDimension("root_baz2").isEmpty());
Assert.assertTrue(row.getDimension("path_omg2").isEmpty());
Assert.assertTrue(row.getDimension("jq_omg2").isEmpty());
numActualIterations++;
}
Assert.assertEquals(numExpectedIterations, numActualIterations);
}
}
}
@Test
public void testTimestampFromData() throws IOException
{
KinesisRecordEntity inputEntity = makeInputEntity(SIMPLE_JSON_VALUE_BYTES, KINESIS_APPROXOIMATE_TIMESTAMP_MILLIS);
final InputEntityReader reader = format.createReader(
new InputRowSchema(
new TimestampSpec("timestamp", "iso", null),
new DimensionsSpec(
DimensionsSpec.getDefaultSchemas(
ImmutableList.of(
"bar",
"foo"
)
)
),
ColumnsFilter.all()
),
newSettableByteEntity(inputEntity),
null
);
final int numExpectedIterations = 1;
try (CloseableIterator<InputRow> iterator = reader.read()) {
int numActualIterations = 0;
while (iterator.hasNext()) {
final InputRow row = iterator.next();
// Payload verifications
// this isn't super realistic, since most of these columns are not actually defined in the dimensionSpec
// but test reading them anyway since it isn't technically illegal
Assert.assertEquals(DateTimes.of(DATA_TIMSTAMP_DATE), row.getTimestamp());
Assert.assertEquals(
String.valueOf(KINESIS_APPROXOIMATE_TIMESTAMP_MILLIS),
Iterables.getOnlyElement(row.getDimension("kinesis.newts.timestamp"))
);
Assert.assertEquals(PARTITION_KEY, Iterables.getOnlyElement(row.getDimension("kinesis.newts.partitionKey")));
Assert.assertEquals("x", Iterables.getOnlyElement(row.getDimension("foo")));
Assert.assertEquals("4", Iterables.getOnlyElement(row.getDimension("baz")));
Assert.assertEquals("4", Iterables.getOnlyElement(row.getDimension("root_baz")));
Assert.assertEquals("1", Iterables.getOnlyElement(row.getDimension("path_omg")));
Assert.assertEquals("1", Iterables.getOnlyElement(row.getDimension("jq_omg")));
Assert.assertEquals(ImmutableMap.of("mg", 1L), row.getRaw("o"));
Assert.assertTrue(row.getDimension("root_baz2").isEmpty());
Assert.assertTrue(row.getDimension("path_omg2").isEmpty());
Assert.assertTrue(row.getDimension("jq_omg2").isEmpty());
Assert.assertTrue(row.getDimension("jq_omg2").isEmpty());
numActualIterations++;
}
Assert.assertEquals(numExpectedIterations, numActualIterations);
}
}
@Test
public void testWithMultipleMixedRecordsTimestampFromData() throws IOException
{
final byte[][] values = new byte[5][];
for (int i = 0; i < values.length; i++) {
values[i] = StringUtils.toUtf8(
"{\n"
+ " \"timestamp\": \"2024-07-2" + i + "\",\n"
+ " \"bar\": null,\n"
+ " \"foo\": \"x\",\n"
+ " \"baz\": 4,\n"
+ " \"index\": " + i + ",\n"
+ " \"o\": {\n"
+ " \"mg\": 1\n"
+ " }\n"
+ "}"
);
}
SettableByteEntity<KinesisRecordEntity> settableByteEntity = new SettableByteEntity<>();
final InputEntityReader reader = format.createReader(
new InputRowSchema(
new TimestampSpec("timestamp", "iso", null),
new DimensionsSpec(
DimensionsSpec.getDefaultSchemas(
ImmutableList.of(
"bar",
"foo",
"kinesis.newts.timestamp"
)
)
),
ColumnsFilter.all()
),
settableByteEntity,
null
);
for (int i = 0; i < values.length; i++) {
KinesisRecordEntity inputEntity = makeInputEntity(values[i], KINESIS_APPROXOIMATE_TIMESTAMP_MILLIS);
settableByteEntity.setEntity(inputEntity);
final int numExpectedIterations = 1;
try (CloseableIterator<InputRow> iterator = reader.read()) {
int numActualIterations = 0;
while (iterator.hasNext()) {
final InputRow row = iterator.next();
// Payload verification
// this isn't super realistic, since most of these columns are not actually defined in the dimensionSpec
// but test reading them anyway since it isn't technically illegal
Assert.assertEquals(DateTimes.of("2024-07-2" + i), row.getTimestamp());
Assert.assertEquals(
String.valueOf(DateTimes.of("2024-07-29").getMillis()),
Iterables.getOnlyElement(row.getDimension("kinesis.newts.timestamp"))
);
Assert.assertEquals(PARTITION_KEY, Iterables.getOnlyElement(row.getDimension("kinesis.newts.partitionKey")));
Assert.assertEquals("x", Iterables.getOnlyElement(row.getDimension("foo")));
Assert.assertEquals("4", Iterables.getOnlyElement(row.getDimension("baz")));
Assert.assertEquals("4", Iterables.getOnlyElement(row.getDimension("root_baz")));
Assert.assertEquals("1", Iterables.getOnlyElement(row.getDimension("path_omg")));
Assert.assertEquals("1", Iterables.getOnlyElement(row.getDimension("jq_omg")));
Assert.assertEquals(ImmutableMap.of("mg", 1L), row.getRaw("o"));
Assert.assertEquals(String.valueOf(i), Iterables.getOnlyElement(row.getDimension("index")));
Assert.assertTrue(row.getDimension("root_baz2").isEmpty());
Assert.assertTrue(row.getDimension("path_omg2").isEmpty());
Assert.assertTrue(row.getDimension("jq_omg2").isEmpty());
numActualIterations++;
}
Assert.assertEquals(numExpectedIterations, numActualIterations);
}
}
}
@Test
public void testMissingTimestampThrowsException() throws IOException
{
KinesisRecordEntity inputEntity =
makeInputEntity(SIMPLE_JSON_VALUE_BYTES, KINESIS_APPROXOIMATE_TIMESTAMP_MILLIS);
final InputEntityReader reader = format.createReader(
new InputRowSchema(
new TimestampSpec("time", "iso", null),
new DimensionsSpec(
DimensionsSpec.getDefaultSchemas(
ImmutableList.of(
"bar",
"foo",
"kinesis.newts.timestamp"
)
)
),
ColumnsFilter.all()
),
newSettableByteEntity(inputEntity),
null
);
try (CloseableIterator<InputRow> iterator = reader.read()) {
while (iterator.hasNext()) {
Throwable t = Assert.assertThrows(ParseException.class, iterator::next);
Assert.assertTrue(
t.getMessage().startsWith("Timestamp[null] is unparseable! Event: {")
);
}
}
}
@Test
public void testWithSchemaDiscoveryKinesisTimestampExcluded() throws IOException
{
KinesisRecordEntity inputEntity =
makeInputEntity(SIMPLE_JSON_VALUE_BYTES, KINESIS_APPROXOIMATE_TIMESTAMP_MILLIS);
final InputEntityReader reader = format.createReader(
new InputRowSchema(
new TimestampSpec("timestamp", "iso", null),
DimensionsSpec.builder()
.useSchemaDiscovery(true)
.setDimensionExclusions(ImmutableList.of("kinesis.newts.timestamp"))
.build(),
ColumnsFilter.all()
),
newSettableByteEntity(inputEntity),
null
);
final int numExpectedIterations = 1;
try (CloseableIterator<InputRow> iterator = reader.read()) {
int numActualIterations = 0;
while (iterator.hasNext()) {
final InputRow row = iterator.next();
List<String> expectedDimensions = Arrays.asList(
"foo",
"root_baz",
"o",
"bar",
"path_omg",
"jq_omg",
"jq_omg2",
"baz",
"root_baz2",
"path_omg2",
"kinesis.newts.partitionKey"
);
Collections.sort(expectedDimensions);
Collections.sort(row.getDimensions());
Assert.assertEquals(
expectedDimensions,
row.getDimensions()
);
// Payload verifications
Assert.assertEquals(DateTimes.of(DATA_TIMSTAMP_DATE), row.getTimestamp());
Assert.assertEquals(
String.valueOf(DateTimes.of(KINESIS_APPROXIMATE_TIME_DATE).getMillis()),
Iterables.getOnlyElement(row.getDimension("kinesis.newts.timestamp"))
);
Assert.assertEquals(PARTITION_KEY, Iterables.getOnlyElement(row.getDimension("kinesis.newts.partitionKey")));
Assert.assertEquals("x", Iterables.getOnlyElement(row.getDimension("foo")));
Assert.assertEquals("4", Iterables.getOnlyElement(row.getDimension("baz")));
Assert.assertEquals("4", Iterables.getOnlyElement(row.getDimension("root_baz")));
Assert.assertEquals("1", Iterables.getOnlyElement(row.getDimension("path_omg")));
Assert.assertEquals("1", Iterables.getOnlyElement(row.getDimension("jq_omg")));
Assert.assertEquals(ImmutableMap.of("mg", 1L), row.getRaw("o"));
Assert.assertTrue(row.getDimension("root_baz2").isEmpty());
Assert.assertTrue(row.getDimension("path_omg2").isEmpty());
Assert.assertTrue(row.getDimension("jq_omg2").isEmpty());
numActualIterations++;
}
Assert.assertEquals(numExpectedIterations, numActualIterations);
}
}
@Test
public void testWithSchemaDiscoveryTimestampFromHeader() throws IOException
{
KinesisRecordEntity inputEntity =
makeInputEntity(SIMPLE_JSON_VALUE_BYTES, KINESIS_APPROXOIMATE_TIMESTAMP_MILLIS);
final InputEntityReader reader = format.createReader(
new InputRowSchema(
new TimestampSpec("kinesis.newts.timestamp", "iso", null),
DimensionsSpec.builder()
.useSchemaDiscovery(true)
.build(),
ColumnsFilter.all()
),
newSettableByteEntity(inputEntity),
null
);
final int numExpectedIterations = 1;
try (CloseableIterator<InputRow> iterator = reader.read()) {
int numActualIterations = 0;
while (iterator.hasNext()) {
final InputRow row = iterator.next();
List<String> expectedDimensions = Arrays.asList(
"foo",
"timestamp",
"root_baz",
"o",
"bar",
"path_omg",
"jq_omg",
"jq_omg2",
"baz",
"root_baz2",
"path_omg2",
"kinesis.newts.partitionKey"
);
Collections.sort(expectedDimensions);
Collections.sort(row.getDimensions());
Assert.assertEquals(
expectedDimensions,
row.getDimensions()
);
// Payload verifications
Assert.assertEquals(DateTimes.of(KINESIS_APPROXIMATE_TIME_DATE), row.getTimestamp());
Assert.assertEquals(
String.valueOf(DateTimes.of(KINESIS_APPROXIMATE_TIME_DATE).getMillis()),
Iterables.getOnlyElement(row.getDimension("kinesis.newts.timestamp"))
);
Assert.assertEquals(PARTITION_KEY, Iterables.getOnlyElement(row.getDimension("kinesis.newts.partitionKey")));
Assert.assertEquals("x", Iterables.getOnlyElement(row.getDimension("foo")));
Assert.assertEquals("4", Iterables.getOnlyElement(row.getDimension("baz")));
Assert.assertEquals("4", Iterables.getOnlyElement(row.getDimension("root_baz")));
Assert.assertEquals("1", Iterables.getOnlyElement(row.getDimension("path_omg")));
Assert.assertEquals("1", Iterables.getOnlyElement(row.getDimension("jq_omg")));
Assert.assertEquals(ImmutableMap.of("mg", 1L), row.getRaw("o"));
Assert.assertTrue(row.getDimension("root_baz2").isEmpty());
Assert.assertTrue(row.getDimension("path_omg2").isEmpty());
Assert.assertTrue(row.getDimension("jq_omg2").isEmpty());
numActualIterations++;
}
Assert.assertEquals(numExpectedIterations, numActualIterations);
}
}
@Test
public void testValueInCsvFormat() throws IOException
{
format = new KinesisInputFormat(
// Value Format
new CsvInputFormat(
Arrays.asList("foo", "bar", "timestamp", "baz"),
null,
false,
false,
0
),
"kinesis.newts.partitionKey",
"kinesis.newts.timestamp"
);
KinesisRecordEntity inputEntity =
makeInputEntity(StringUtils.toUtf8("x,,2024-07-30,4"), KINESIS_APPROXOIMATE_TIMESTAMP_MILLIS);
final InputEntityReader reader = format.createReader(
new InputRowSchema(
new TimestampSpec("timestamp", "iso", null),
new DimensionsSpec(
DimensionsSpec.getDefaultSchemas(
ImmutableList.of(
"bar",
"foo",
"kinesis.newts.timestamp",
"kinesis.newts.partitionKey"
)
)
),
ColumnsFilter.all()
),
newSettableByteEntity(inputEntity),
null
);
final int numExpectedIterations = 1;
try (CloseableIterator<InputRow> iterator = reader.read()) {
int numActualIterations = 0;
while (iterator.hasNext()) {
final InputRow row = iterator.next();
Assert.assertEquals(
Arrays.asList(
"bar",
"foo",
"kinesis.newts.timestamp",
"kinesis.newts.partitionKey"
),
row.getDimensions()
);
// Payload verifications
// this isn't super realistic, since most of these columns are not actually defined in the dimensionSpec
// but test reading them anyway since it isn't technically illegal
Assert.assertEquals(DateTimes.of("2024-07-30"), row.getTimestamp());
Assert.assertEquals(
String.valueOf(DateTimes.of(KINESIS_APPROXIMATE_TIME_DATE).getMillis()),
Iterables.getOnlyElement(row.getDimension("kinesis.newts.timestamp"))
);
Assert.assertEquals(PARTITION_KEY, Iterables.getOnlyElement(row.getDimension("kinesis.newts.partitionKey")));
Assert.assertEquals("x", Iterables.getOnlyElement(row.getDimension("foo")));
Assert.assertEquals("4", Iterables.getOnlyElement(row.getDimension("baz")));
Assert.assertTrue(row.getDimension("bar").isEmpty());
numActualIterations++;
}
Assert.assertEquals(numExpectedIterations, numActualIterations);
}
}
@Test
public void testWithPartialDeclarationSchemaDiscovery() throws IOException
{
KinesisRecordEntity inputEntity =
makeInputEntity(SIMPLE_JSON_VALUE_BYTES, KINESIS_APPROXOIMATE_TIMESTAMP_MILLIS);
final InputEntityReader reader = format.createReader(
new InputRowSchema(
new TimestampSpec("timestamp", "iso", null),
DimensionsSpec.builder().setDimensions(
DimensionsSpec.getDefaultSchemas(ImmutableList.of("bar"))
).useSchemaDiscovery(true).build(),
ColumnsFilter.all()
),
newSettableByteEntity(inputEntity),
null
);
final int numExpectedIterations = 1;
try (CloseableIterator<InputRow> iterator = reader.read()) {
int numActualIterations = 0;
while (iterator.hasNext()) {
final InputRow row = iterator.next();
List<String> expectedDimensions = Arrays.asList(
"bar",
"foo",
"kinesis.newts.timestamp",
"kinesis.newts.partitionKey",
"root_baz",
"o",
"path_omg",
"jq_omg",
"jq_omg2",
"baz",
"root_baz2",
"path_omg2"
);
Collections.sort(expectedDimensions);
Collections.sort(row.getDimensions());
Assert.assertEquals(
expectedDimensions,
row.getDimensions()
);
// Payload verifications
Assert.assertEquals(DateTimes.of(DATA_TIMSTAMP_DATE), row.getTimestamp());
Assert.assertEquals(
String.valueOf(DateTimes.of(KINESIS_APPROXIMATE_TIME_DATE).getMillis()),
Iterables.getOnlyElement(row.getDimension("kinesis.newts.timestamp"))
);
Assert.assertEquals(PARTITION_KEY, Iterables.getOnlyElement(row.getDimension("kinesis.newts.partitionKey")));
Assert.assertEquals("x", Iterables.getOnlyElement(row.getDimension("foo")));
Assert.assertEquals("4", Iterables.getOnlyElement(row.getDimension("baz")));
Assert.assertEquals("4", Iterables.getOnlyElement(row.getDimension("root_baz")));
Assert.assertEquals("1", Iterables.getOnlyElement(row.getDimension("path_omg")));
Assert.assertEquals("1", Iterables.getOnlyElement(row.getDimension("jq_omg")));
Assert.assertEquals(ImmutableMap.of("mg", 1L), row.getRaw("o"));
Assert.assertTrue(row.getDimension("root_baz2").isEmpty());
Assert.assertTrue(row.getDimension("path_omg2").isEmpty());
Assert.assertTrue(row.getDimension("jq_omg2").isEmpty());
numActualIterations++;
}
Assert.assertEquals(numExpectedIterations, numActualIterations);
}
}
@Test
@SuppressWarnings("ResultOfMethodCallIgnored")
public void testValidInputFormatConstruction()
{
InputFormat valueFormat = new JsonInputFormat(
new JSONPathSpec(
true,
ImmutableList.of(
new JSONPathFieldSpec(JSONPathFieldType.ROOT, "root_baz", "baz"),
new JSONPathFieldSpec(JSONPathFieldType.ROOT, "root_baz2", "baz2"),
new JSONPathFieldSpec(JSONPathFieldType.PATH, "path_omg", "$.o.mg"),
new JSONPathFieldSpec(JSONPathFieldType.PATH, "path_omg2", "$.o.mg2"),
new JSONPathFieldSpec(JSONPathFieldType.JQ, "jq_omg", ".o.mg"),
new JSONPathFieldSpec(JSONPathFieldType.JQ, "jq_omg2", ".o.mg2")
)
),
null,
null,
false,
false
);
// null partitionKeyColumnName and null timestampColumnName is valid
new KinesisInputFormat(valueFormat, null, null);
// non-null partitionKeyColumnName and null timestampColumnName is valid
new KinesisInputFormat(valueFormat, "kinesis.partitionKey", null);
// null partitionKeyColumnName and non-null timestampColumnName is valid
new KinesisInputFormat(valueFormat, null, "kinesis.timestamp");
// non-null partitionKeyColumnName and non-null timestampColumnName is valid
new KinesisInputFormat(valueFormat, "kinesis.partitionKey", "kinesis.timestamp");
}
@Test
@SuppressWarnings("ResultOfMethodCallIgnored")
public void testInvalidInputFormatConstruction()
{
// null value format is invalid
Assert.assertThrows(
"valueFormat must not be null",
NullPointerException.class,
() -> new KinesisInputFormat(null, null, null)
);
InputFormat valueFormat = new JsonInputFormat(
new JSONPathSpec(
true,
ImmutableList.of(
new JSONPathFieldSpec(JSONPathFieldType.ROOT, "root_baz", "baz"),
new JSONPathFieldSpec(JSONPathFieldType.ROOT, "root_baz2", "baz2"),
new JSONPathFieldSpec(JSONPathFieldType.PATH, "path_omg", "$.o.mg"),
new JSONPathFieldSpec(JSONPathFieldType.PATH, "path_omg2", "$.o.mg2"),
new JSONPathFieldSpec(JSONPathFieldType.JQ, "jq_omg", ".o.mg"),
new JSONPathFieldSpec(JSONPathFieldType.JQ, "jq_omg2", ".o.mg2")
)
),
null,
null,
false,
false
);
// partitionKeyColumnName == timestampColumnName is invalid
Assert.assertThrows(
"timestampColumnName and partitionKeyColumnName must be different",
IllegalStateException.class,
() -> new KinesisInputFormat(valueFormat, "kinesis.timestamp", "kinesis.timestamp")
);
}
private KinesisRecordEntity makeInputEntity(
byte[] payload,
long kinesisTimestampMillis)
{
return new KinesisRecordEntity(
new Record().withData(ByteBuffer.wrap(payload))
.withApproximateArrivalTimestamp(new Date(kinesisTimestampMillis))
.withPartitionKey(PARTITION_KEY)
);
}
private SettableByteEntity<KinesisRecordEntity> newSettableByteEntity(KinesisRecordEntity kinesisRecordEntity)
{
SettableByteEntity<KinesisRecordEntity> settableByteEntity = new SettableByteEntity<>();
settableByteEntity.setEntity(kinesisRecordEntity);
return settableByteEntity;
}
}

View File

@ -19,6 +19,7 @@
package org.apache.druid.indexing.kinesis; package org.apache.druid.indexing.kinesis;
import com.amazonaws.services.kinesis.model.Record;
import com.fasterxml.jackson.annotation.JacksonInject; import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
@ -41,6 +42,7 @@ import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.FloatDimensionSchema; import org.apache.druid.data.input.impl.FloatDimensionSchema;
import org.apache.druid.data.input.impl.LongDimensionSchema; import org.apache.druid.data.input.impl.LongDimensionSchema;
import org.apache.druid.data.input.impl.StringDimensionSchema; import org.apache.druid.data.input.impl.StringDimensionSchema;
import org.apache.druid.data.input.kinesis.KinesisRecordEntity;
import org.apache.druid.indexer.TaskState; import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexer.report.IngestionStatsAndErrors; import org.apache.druid.indexer.report.IngestionStatsAndErrors;
@ -127,39 +129,39 @@ public class KinesisIndexTaskTest extends SeekableStreamIndexTaskTestBase
private static final String SHARD_ID0 = "0"; private static final String SHARD_ID0 = "0";
private static final List<KinesisRecord> RECORDS = Arrays.asList( private static final List<KinesisRecord> RECORDS = Arrays.asList(
createRecord("1", "0", jb("2008", "a", "y", "10", "20.0", "1.0")), createRecord("1", "0", kjb("2008", "a", "y", "10", "20.0", "1.0")),
createRecord("1", "1", jb("2009", "b", "y", "10", "20.0", "1.0")), createRecord("1", "1", kjb("2009", "b", "y", "10", "20.0", "1.0")),
createRecord("1", "2", jb("2010", "c", "y", "10", "20.0", "1.0")), createRecord("1", "2", kjb("2010", "c", "y", "10", "20.0", "1.0")),
createRecord("1", "3", jb("2011", "d", "y", "10", "20.0", "1.0")), createRecord("1", "3", kjb("2011", "d", "y", "10", "20.0", "1.0")),
createRecord("1", "4", jb("2011", "e", "y", "10", "20.0", "1.0")), createRecord("1", "4", kjb("2011", "e", "y", "10", "20.0", "1.0")),
createRecord("1", "5", jb("246140482-04-24T15:36:27.903Z", "x", "z", "10", "20.0", "1.0")), createRecord("1", "5", kjb("246140482-04-24T15:36:27.903Z", "x", "z", "10", "20.0", "1.0")),
createRecord("1", "6", new ByteEntity(StringUtils.toUtf8("unparseable"))), createRecord("1", "6", new KinesisRecordEntity(new Record().withData(new ByteEntity(StringUtils.toUtf8("unparseable")).getBuffer()))),
createRecord("1", "7", new ByteEntity(StringUtils.toUtf8(""))), createRecord("1", "7", new KinesisRecordEntity(new Record().withData(new ByteEntity(StringUtils.toUtf8("")).getBuffer()))),
createRecord("1", "8", new ByteEntity(StringUtils.toUtf8("{}"))), createRecord("1", "8", new KinesisRecordEntity(new Record().withData(new ByteEntity(StringUtils.toUtf8("{}")).getBuffer()))),
createRecord("1", "9", jb("2013", "f", "y", "10", "20.0", "1.0")), createRecord("1", "9", kjb("2013", "f", "y", "10", "20.0", "1.0")),
createRecord("1", "10", jb("2049", "f", "y", "notanumber", "20.0", "1.0")), createRecord("1", "10", kjb("2049", "f", "y", "notanumber", "20.0", "1.0")),
createRecord("1", "11", jb("2049", "f", "y", "10", "notanumber", "1.0")), createRecord("1", "11", kjb("2049", "f", "y", "10", "notanumber", "1.0")),
createRecord("1", "12", jb("2049", "f", "y", "10", "20.0", "notanumber")), createRecord("1", "12", kjb("2049", "f", "y", "10", "20.0", "notanumber")),
createRecord("0", "0", jb("2012", "g", "y", "10", "20.0", "1.0")), createRecord("0", "0", kjb("2012", "g", "y", "10", "20.0", "1.0")),
createRecord("0", "1", jb("2011", "h", "y", "10", "20.0", "1.0")) createRecord("0", "1", kjb("2011", "h", "y", "10", "20.0", "1.0"))
); );
private static final List<KinesisRecord> SINGLE_PARTITION_RECORDS = Arrays.asList( private static final List<KinesisRecord> SINGLE_PARTITION_RECORDS = Arrays.asList(
createRecord("1", "0", jb("2008", "a", "y", "10", "20.0", "1.0")), createRecord("1", "0", kjb("2008", "a", "y", "10", "20.0", "1.0")),
createRecord("1", "1", jb("2009", "b", "y", "10", "20.0", "1.0")), createRecord("1", "1", kjb("2009", "b", "y", "10", "20.0", "1.0")),
createRecord("1", "2", jb("2010", "c", "y", "10", "20.0", "1.0")), createRecord("1", "2", kjb("2010", "c", "y", "10", "20.0", "1.0")),
createRecord("1", "3", jb("2011", "d", "y", "10", "20.0", "1.0")), createRecord("1", "3", kjb("2011", "d", "y", "10", "20.0", "1.0")),
createRecord("1", "4", jb("2011", "e", "y", "10", "20.0", "1.0")), createRecord("1", "4", kjb("2011", "e", "y", "10", "20.0", "1.0")),
createRecord("1", "5", jb("2012", "a", "y", "10", "20.0", "1.0")), createRecord("1", "5", kjb("2012", "a", "y", "10", "20.0", "1.0")),
createRecord("1", "6", jb("2013", "b", "y", "10", "20.0", "1.0")), createRecord("1", "6", kjb("2013", "b", "y", "10", "20.0", "1.0")),
createRecord("1", "7", jb("2010", "c", "y", "10", "20.0", "1.0")), createRecord("1", "7", kjb("2010", "c", "y", "10", "20.0", "1.0")),
createRecord("1", "8", jb("2011", "d", "y", "10", "20.0", "1.0")), createRecord("1", "8", kjb("2011", "d", "y", "10", "20.0", "1.0")),
createRecord("1", "9", jb("2011", "e", "y", "10", "20.0", "1.0")), createRecord("1", "9", kjb("2011", "e", "y", "10", "20.0", "1.0")),
createRecord("1", "10", jb("2008", "a", "y", "10", "20.0", "1.0")), createRecord("1", "10", kjb("2008", "a", "y", "10", "20.0", "1.0")),
createRecord("1", "11", jb("2009", "b", "y", "10", "20.0", "1.0")), createRecord("1", "11", kjb("2009", "b", "y", "10", "20.0", "1.0")),
createRecord("1", "12", jb("2010", "c", "y", "10", "20.0", "1.0")), createRecord("1", "12", kjb("2010", "c", "y", "10", "20.0", "1.0")),
createRecord("1", "13", jb("2012", "d", "y", "10", "20.0", "1.0")), createRecord("1", "13", kjb("2012", "d", "y", "10", "20.0", "1.0")),
createRecord("1", "14", jb("2013", "e", "y", "10", "20.0", "1.0")) createRecord("1", "14", kjb("2013", "e", "y", "10", "20.0", "1.0"))
); );
private static KinesisRecordSupplier recordSupplier; private static KinesisRecordSupplier recordSupplier;
@ -272,12 +274,12 @@ public class KinesisIndexTaskTest extends SeekableStreamIndexTaskTestBase
record.getPartitionId(), record.getPartitionId(),
record.getSequenceNumber(), record.getSequenceNumber(),
record.getData().stream() record.getData().stream()
.map(entity -> new ByteEntity(entity.getBuffer())) .map(entity -> new KinesisRecordEntity(new Record().withData(entity.getBuffer())))
.collect(Collectors.toList()) .collect(Collectors.toList())
); );
} }
private static List<OrderedPartitionableRecord<String, String, ByteEntity>> clone( private static List<OrderedPartitionableRecord<String, String, KinesisRecordEntity>> clone(
List<KinesisRecord> records, List<KinesisRecord> records,
int start, int start,
int end int end
@ -289,14 +291,14 @@ public class KinesisIndexTaskTest extends SeekableStreamIndexTaskTestBase
/** /**
* Records can only be read once, hence we must use fresh records every time. * Records can only be read once, hence we must use fresh records every time.
*/ */
private static List<OrderedPartitionableRecord<String, String, ByteEntity>> clone( private static List<OrderedPartitionableRecord<String, String, KinesisRecordEntity>> clone(
List<KinesisRecord> records List<KinesisRecord> records
) )
{ {
return records.stream().map(KinesisIndexTaskTest::clone).collect(Collectors.toList()); return records.stream().map(KinesisIndexTaskTest::clone).collect(Collectors.toList());
} }
private static KinesisRecord createRecord(String partitionId, String sequenceNumber, ByteEntity entity) private static KinesisRecord createRecord(String partitionId, String sequenceNumber, KinesisRecordEntity entity)
{ {
return new KinesisRecord(STREAM, partitionId, sequenceNumber, Collections.singletonList(entity)); return new KinesisRecord(STREAM, partitionId, sequenceNumber, Collections.singletonList(entity));
} }
@ -1697,7 +1699,7 @@ public class KinesisIndexTaskTest extends SeekableStreamIndexTaskTestBase
maxRowsPerSegment = 2; maxRowsPerSegment = 2;
maxRecordsPerPoll = 1; maxRecordsPerPoll = 1;
maxBytesPerPoll = 1_000_000; maxBytesPerPoll = 1_000_000;
List<OrderedPartitionableRecord<String, String, ByteEntity>> records = List<OrderedPartitionableRecord<String, String, KinesisRecordEntity>> records =
clone(SINGLE_PARTITION_RECORDS); clone(SINGLE_PARTITION_RECORDS);
recordSupplier.assign(EasyMock.anyObject()); recordSupplier.assign(EasyMock.anyObject());
@ -2148,7 +2150,7 @@ public class KinesisIndexTaskTest extends SeekableStreamIndexTaskTestBase
recordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString()); recordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString());
EasyMock.expectLastCall().anyTimes(); EasyMock.expectLastCall().anyTimes();
List<OrderedPartitionableRecord<String, String, ByteEntity>> eosRecord = ImmutableList.of( List<OrderedPartitionableRecord<String, String, KinesisRecordEntity>> eosRecord = ImmutableList.of(
new OrderedPartitionableRecord<>(STREAM, SHARD_ID1, KinesisSequenceNumber.END_OF_SHARD_MARKER, null) new OrderedPartitionableRecord<>(STREAM, SHARD_ID1, KinesisSequenceNumber.END_OF_SHARD_MARKER, null)
); );
@ -2454,6 +2456,18 @@ public class KinesisIndexTaskTest extends SeekableStreamIndexTaskTestBase
return task.getRunner().getStatus() == SeekableStreamIndexTaskRunner.Status.READING; return task.getRunner().getStatus() == SeekableStreamIndexTaskRunner.Status.READING;
} }
private static KinesisRecordEntity kjb(
String timestamp,
String dim1,
String dim2,
String dimLong,
String dimFloat,
String met1
)
{
return new KinesisRecordEntity(new Record().withData(jb(timestamp, dim1, dim2, dimLong, dimFloat, met1).getBuffer()));
}
@JsonTypeName("index_kinesis") @JsonTypeName("index_kinesis")
private static class TestableKinesisIndexTask extends KinesisIndexTask private static class TestableKinesisIndexTask extends KinesisIndexTask
{ {
@ -2497,15 +2511,15 @@ public class KinesisIndexTaskTest extends SeekableStreamIndexTaskTestBase
/** /**
* Utility class to keep the test code more readable. * Utility class to keep the test code more readable.
*/ */
private static class KinesisRecord extends OrderedPartitionableRecord<String, String, ByteEntity> private static class KinesisRecord extends OrderedPartitionableRecord<String, String, KinesisRecordEntity>
{ {
private final List<ByteEntity> data; private final List<KinesisRecordEntity> data;
public KinesisRecord( public KinesisRecord(
String stream, String stream,
String partitionId, String partitionId,
String sequenceNumber, String sequenceNumber,
List<ByteEntity> data List<KinesisRecordEntity> data
) )
{ {
super(stream, partitionId, sequenceNumber, data); super(stream, partitionId, sequenceNumber, data);
@ -2514,7 +2528,7 @@ public class KinesisIndexTaskTest extends SeekableStreamIndexTaskTestBase
@Nonnull @Nonnull
@Override @Override
public List<ByteEntity> getData() public List<KinesisRecordEntity> getData()
{ {
return data; return data;
} }

View File

@ -39,6 +39,7 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet; import com.google.common.collect.ImmutableSet;
import org.apache.druid.data.input.impl.ByteEntity; import org.apache.druid.data.input.impl.ByteEntity;
import org.apache.druid.data.input.kinesis.KinesisRecordEntity;
import org.apache.druid.indexing.seekablestream.common.OrderedPartitionableRecord; import org.apache.druid.indexing.seekablestream.common.OrderedPartitionableRecord;
import org.apache.druid.indexing.seekablestream.common.StreamPartition; import org.apache.druid.indexing.seekablestream.common.StreamPartition;
import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.ISE;
@ -99,34 +100,26 @@ public class KinesisRecordSupplierTest extends EasyMockSupport
new Record().withData(jb("2012", "g", "y", "10", "20.0", "1.0")).withSequenceNumber("8"), new Record().withData(jb("2012", "g", "y", "10", "20.0", "1.0")).withSequenceNumber("8"),
new Record().withData(jb("2011", "h", "y", "10", "20.0", "1.0")).withSequenceNumber("9") new Record().withData(jb("2011", "h", "y", "10", "20.0", "1.0")).withSequenceNumber("9")
); );
private static final List<OrderedPartitionableRecord<String, String, ByteEntity>> ALL_RECORDS = ImmutableList.<OrderedPartitionableRecord<String, String, ByteEntity>>builder() private static final List<OrderedPartitionableRecord<String, String, KinesisRecordEntity>> ALL_RECORDS = ImmutableList.<OrderedPartitionableRecord<String, String, KinesisRecordEntity>>builder()
.addAll(SHARD0_RECORDS.stream() .addAll(SHARD0_RECORDS.stream()
.map(x -> new OrderedPartitionableRecord<>( .map(x -> new OrderedPartitionableRecord<>(
STREAM, STREAM,
SHARD_ID0, SHARD_ID0,
x.getSequenceNumber(), x.getSequenceNumber(),
Collections Collections.singletonList(new KinesisRecordEntity(new Record().withData(new ByteEntity(x.getData()).getBuffer())))
.singletonList( ))
new ByteEntity( .collect(
x.getData())) Collectors
)) .toList()))
.collect( .addAll(SHARD1_RECORDS.stream()
Collectors .map(x -> new OrderedPartitionableRecord<>(
.toList())) STREAM,
.addAll(SHARD1_RECORDS.stream() SHARD_ID1,
.map(x -> new OrderedPartitionableRecord<>( x.getSequenceNumber(),
STREAM, Collections.singletonList(new KinesisRecordEntity(new Record().withData(new ByteEntity(x.getData()).getBuffer())))
SHARD_ID1, ))
x.getSequenceNumber(), .collect(Collectors.toList()))
Collections .build();
.singletonList(
new ByteEntity(
x.getData()))
))
.collect(
Collectors
.toList()))
.build();
private static ByteBuffer jb(String timestamp, String dim1, String dim2, String dimLong, String dimFloat, String met1) private static ByteBuffer jb(String timestamp, String dim1, String dim2, String dimLong, String dimFloat, String met1)
@ -316,7 +309,7 @@ public class KinesisRecordSupplierTest extends EasyMockSupport
} }
// filter out EOS markers // filter out EOS markers
private static List<OrderedPartitionableRecord<String, String, ByteEntity>> cleanRecords(List<OrderedPartitionableRecord<String, String, ByteEntity>> records) private static List<OrderedPartitionableRecord<String, String, KinesisRecordEntity>> cleanRecords(List<OrderedPartitionableRecord<String, String, KinesisRecordEntity>> records)
{ {
return records.stream() return records.stream()
.filter(x -> !x.getSequenceNumber() .filter(x -> !x.getSequenceNumber()
@ -398,7 +391,7 @@ public class KinesisRecordSupplierTest extends EasyMockSupport
Thread.sleep(100); Thread.sleep(100);
} }
List<OrderedPartitionableRecord<String, String, ByteEntity>> polledRecords = cleanRecords(recordSupplier.poll( List<OrderedPartitionableRecord<String, String, KinesisRecordEntity>> polledRecords = cleanRecords(recordSupplier.poll(
POLL_TIMEOUT_MILLIS)); POLL_TIMEOUT_MILLIS));
verifyAll(); verifyAll();
@ -457,7 +450,7 @@ public class KinesisRecordSupplierTest extends EasyMockSupport
} }
Assert.assertFalse(recordSupplier.isAnyFetchActive()); Assert.assertFalse(recordSupplier.isAnyFetchActive());
List<OrderedPartitionableRecord<String, String, ByteEntity>> polledRecords = cleanRecords(recordSupplier.poll( List<OrderedPartitionableRecord<String, String, KinesisRecordEntity>> polledRecords = cleanRecords(recordSupplier.poll(
POLL_TIMEOUT_MILLIS)); POLL_TIMEOUT_MILLIS));
verifyAll(); verifyAll();
@ -531,7 +524,7 @@ public class KinesisRecordSupplierTest extends EasyMockSupport
Thread.sleep(100); Thread.sleep(100);
} }
List<OrderedPartitionableRecord<String, String, ByteEntity>> polledRecords = cleanRecords(recordSupplier.poll( List<OrderedPartitionableRecord<String, String, KinesisRecordEntity>> polledRecords = cleanRecords(recordSupplier.poll(
POLL_TIMEOUT_MILLIS)); POLL_TIMEOUT_MILLIS));
verifyAll(); verifyAll();
@ -687,7 +680,7 @@ public class KinesisRecordSupplierTest extends EasyMockSupport
Thread.sleep(100); Thread.sleep(100);
} }
OrderedPartitionableRecord<String, String, ByteEntity> firstRecord = recordSupplier.poll(POLL_TIMEOUT_MILLIS).get(0); OrderedPartitionableRecord<String, String, KinesisRecordEntity> firstRecord = recordSupplier.poll(POLL_TIMEOUT_MILLIS).get(0);
Assert.assertEquals( Assert.assertEquals(
ALL_RECORDS.get(7), ALL_RECORDS.get(7),
@ -705,7 +698,7 @@ public class KinesisRecordSupplierTest extends EasyMockSupport
} }
OrderedPartitionableRecord<String, String, ByteEntity> record2 = recordSupplier.poll(POLL_TIMEOUT_MILLIS).get(0); OrderedPartitionableRecord<String, String, KinesisRecordEntity> record2 = recordSupplier.poll(POLL_TIMEOUT_MILLIS).get(0);
Assert.assertEquals(ALL_RECORDS.get(9), record2); Assert.assertEquals(ALL_RECORDS.get(9), record2);
// only one partition in this test. second results come from getRecordsResult0, which has SHARD0_LAG_MILLIS // only one partition in this test. second results come from getRecordsResult0, which has SHARD0_LAG_MILLIS
@ -776,7 +769,7 @@ public class KinesisRecordSupplierTest extends EasyMockSupport
Thread.sleep(100); Thread.sleep(100);
} }
List<OrderedPartitionableRecord<String, String, ByteEntity>> polledRecords = cleanRecords(recordSupplier.poll( List<OrderedPartitionableRecord<String, String, KinesisRecordEntity>> polledRecords = cleanRecords(recordSupplier.poll(
POLL_TIMEOUT_MILLIS)); POLL_TIMEOUT_MILLIS));
verifyAll(); verifyAll();

View File

@ -19,6 +19,7 @@
package org.apache.druid.indexing.kinesis; package org.apache.druid.indexing.kinesis;
import com.amazonaws.services.kinesis.model.Record;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;
@ -27,7 +28,6 @@ import org.apache.druid.client.indexing.SamplerResponse;
import org.apache.druid.client.indexing.SamplerSpec; import org.apache.druid.client.indexing.SamplerSpec;
import org.apache.druid.common.aws.AWSCredentialsConfig; import org.apache.druid.common.aws.AWSCredentialsConfig;
import org.apache.druid.common.config.NullHandling; import org.apache.druid.common.config.NullHandling;
import org.apache.druid.data.input.impl.ByteEntity;
import org.apache.druid.data.input.impl.DimensionsSpec; import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.FloatDimensionSchema; import org.apache.druid.data.input.impl.FloatDimensionSchema;
import org.apache.druid.data.input.impl.InputRowParser; import org.apache.druid.data.input.impl.InputRowParser;
@ -37,6 +37,7 @@ import org.apache.druid.data.input.impl.LongDimensionSchema;
import org.apache.druid.data.input.impl.StringDimensionSchema; import org.apache.druid.data.input.impl.StringDimensionSchema;
import org.apache.druid.data.input.impl.StringInputRowParser; import org.apache.druid.data.input.impl.StringInputRowParser;
import org.apache.druid.data.input.impl.TimestampSpec; import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.data.input.kinesis.KinesisRecordEntity;
import org.apache.druid.indexing.kinesis.supervisor.KinesisSupervisorIOConfig; import org.apache.druid.indexing.kinesis.supervisor.KinesisSupervisorIOConfig;
import org.apache.druid.indexing.kinesis.supervisor.KinesisSupervisorSpec; import org.apache.druid.indexing.kinesis.supervisor.KinesisSupervisorSpec;
import org.apache.druid.indexing.overlord.sampler.InputSourceSampler; import org.apache.druid.indexing.overlord.sampler.InputSourceSampler;
@ -63,6 +64,7 @@ import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
import java.io.IOException; import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.Iterator; import java.util.Iterator;
@ -99,7 +101,7 @@ public class KinesisSamplerSpecTest extends EasyMockSupport
private final KinesisRecordSupplier recordSupplier = mock(KinesisRecordSupplier.class); private final KinesisRecordSupplier recordSupplier = mock(KinesisRecordSupplier.class);
private static List<OrderedPartitionableRecord<String, String, ByteEntity>> generateRecords(String stream) private static List<OrderedPartitionableRecord<String, String, KinesisRecordEntity>> generateRecords(String stream)
{ {
return ImmutableList.of( return ImmutableList.of(
new OrderedPartitionableRecord<>(stream, "1", "0", jb("2008", "a", "y", "10", "20.0", "1.0")), new OrderedPartitionableRecord<>(stream, "1", "0", jb("2008", "a", "y", "10", "20.0", "1.0")),
@ -115,9 +117,9 @@ public class KinesisSamplerSpecTest extends EasyMockSupport
stream, stream,
"1", "1",
"6", "6",
Collections.singletonList(new ByteEntity(StringUtils.toUtf8("unparseable"))) Collections.singletonList(new KinesisRecordEntity(new Record().withData(ByteBuffer.wrap(StringUtils.toUtf8("unparseable")))))
), ),
new OrderedPartitionableRecord<>(stream, "1", "8", Collections.singletonList(new ByteEntity(StringUtils.toUtf8("{}")))) new OrderedPartitionableRecord<>(stream, "1", "8", Collections.singletonList(new KinesisRecordEntity(new Record().withData(ByteBuffer.wrap(StringUtils.toUtf8("{}"))))))
); );
} }
@ -428,19 +430,19 @@ public class KinesisSamplerSpecTest extends EasyMockSupport
Assert.assertFalse(it.hasNext()); Assert.assertFalse(it.hasNext());
} }
private static List<ByteEntity> jb(String ts, String dim1, String dim2, String dimLong, String dimFloat, String met1) private static List<KinesisRecordEntity> jb(String ts, String dim1, String dim2, String dimLong, String dimFloat, String met1)
{ {
try { try {
return Collections.singletonList(new ByteEntity(new ObjectMapper().writeValueAsBytes( return Collections.singletonList(new KinesisRecordEntity(new Record().withData(ByteBuffer.wrap(new ObjectMapper().writeValueAsBytes(
ImmutableMap.builder() ImmutableMap.builder()
.put("timestamp", ts) .put("timestamp", ts)
.put("dim1", dim1) .put("dim1", dim1)
.put("dim2", dim2) .put("dim2", dim2)
.put("dimLong", dimLong) .put("dimLong", dimLong)
.put("dimFloat", dimFloat) .put("dimFloat", dimFloat)
.put("met1", met1) .put("met1", met1)
.build() .build()
))); )))));
} }
catch (Exception e) { catch (Exception e) {
throw new RuntimeException(e); throw new RuntimeException(e);

View File

@ -27,12 +27,12 @@ import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListenableFuture;
import org.apache.druid.data.input.InputFormat; import org.apache.druid.data.input.InputFormat;
import org.apache.druid.data.input.impl.ByteEntity;
import org.apache.druid.data.input.impl.DimensionSchema; import org.apache.druid.data.input.impl.DimensionSchema;
import org.apache.druid.data.input.impl.DimensionsSpec; import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.JsonInputFormat; import org.apache.druid.data.input.impl.JsonInputFormat;
import org.apache.druid.data.input.impl.StringDimensionSchema; import org.apache.druid.data.input.impl.StringDimensionSchema;
import org.apache.druid.data.input.impl.TimestampSpec; import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.data.input.kinesis.KinesisRecordEntity;
import org.apache.druid.indexer.TaskLocation; import org.apache.druid.indexer.TaskLocation;
import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.TaskInfoProvider; import org.apache.druid.indexing.common.TaskInfoProvider;
@ -5656,7 +5656,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
} }
@Override @Override
protected RecordSupplier<String, String, ByteEntity> setupRecordSupplier() protected RecordSupplier<String, String, KinesisRecordEntity> setupRecordSupplier()
{ {
return supervisorRecordSupplier; return supervisorRecordSupplier;
} }