From 9b731e8f0a0fb4bc5ef1faa7ad8292cd09159a7b Mon Sep 17 00:00:00 2001 From: zachjsh Date: Fri, 2 Aug 2024 08:48:44 -0400 Subject: [PATCH] Kinesis Input Format for timestamp, and payload parsing (#16813) * SQL syntax error should target USER persona * * revert change to queryHandler and related tests, based on review comments * * add test * Introduce KinesisRecordEntity to support Kinesis headers in InputFormats * * add kinesisInputFormat and Reader, and tests * * bind KinesisInputFormat class to module * * improve test coverage * * remove references to kafka * * resolve review comments * * remove comment * * fix grammer of comment * * fix comment again * * fix comment again * * more review comments * * add partitionKey * * add check for same timestamp and partitionKey column name * * fix intellij inspection --- .../data/input/kafka/KafkaRecordEntity.java | 2 - .../input/kinesis/KinesisInputFormat.java | 157 +++ .../input/kinesis/KinesisInputReader.java | 256 +++++ .../input/kinesis/KinesisRecordEntity.java | 51 + .../indexing/kinesis/KinesisIndexTask.java | 6 +- .../kinesis/KinesisIndexTaskRunner.java | 10 +- .../kinesis/KinesisIndexingServiceModule.java | 4 +- .../kinesis/KinesisRecordSupplier.java | 28 +- .../kinesis/supervisor/KinesisSupervisor.java | 10 +- .../input/kinesis/KinesisInputFormatTest.java | 940 ++++++++++++++++++ .../kinesis/KinesisIndexTaskTest.java | 94 +- .../kinesis/KinesisRecordSupplierTest.java | 63 +- .../kinesis/KinesisSamplerSpecTest.java | 30 +- .../supervisor/KinesisSupervisorTest.java | 4 +- 14 files changed, 1534 insertions(+), 121 deletions(-) create mode 100644 extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/data/input/kinesis/KinesisInputFormat.java create mode 100644 extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/data/input/kinesis/KinesisInputReader.java create mode 100644 extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/data/input/kinesis/KinesisRecordEntity.java create mode 100644 extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/data/input/kinesis/KinesisInputFormatTest.java diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/data/input/kafka/KafkaRecordEntity.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/data/input/kafka/KafkaRecordEntity.java index 41c2c0a0325..53369cc6ad6 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/data/input/kafka/KafkaRecordEntity.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/data/input/kafka/KafkaRecordEntity.java @@ -33,8 +33,6 @@ import org.apache.kafka.clients.consumer.ConsumerRecord; * key, and timestamp. *

* NOTE: Any records with null values will be skipped, even if they contain non-null keys, or headers - *

- * This functionality is not yet exposed through any built-in InputFormats, but is available for use in extensions. */ public class KafkaRecordEntity extends ByteEntity { diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/data/input/kinesis/KinesisInputFormat.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/data/input/kinesis/KinesisInputFormat.java new file mode 100644 index 00000000000..7d97fffed37 --- /dev/null +++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/data/input/kinesis/KinesisInputFormat.java @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.data.input.kinesis; + +import com.amazonaws.services.kinesis.model.Record; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Preconditions; +import org.apache.druid.data.input.InputEntity; +import org.apache.druid.data.input.InputEntityReader; +import org.apache.druid.data.input.InputFormat; +import org.apache.druid.data.input.InputRowSchema; +import org.apache.druid.data.input.impl.JsonInputFormat; +import org.apache.druid.data.input.impl.TimestampSpec; +import org.apache.druid.indexing.seekablestream.SettableByteEntity; +import org.apache.druid.java.util.common.DateTimes; + +import javax.annotation.Nullable; + +import java.io.File; +import java.util.Objects; + +/** + * Kinesis aware InputFormat. Allows for reading kinesis specific values that are stored in the {@link Record}. At + * this time, this input format only supports reading data from the following record components + *

+ * - {@link Record#data} + * - {@link Record#approximateArrivalTimestamp} + * - {@link Record#partitionKey} + *

+ * This class can be extended easily to read other fields available in the kinesis record. + */ +public class KinesisInputFormat implements InputFormat +{ + private static final String DEFAULT_TIMESTAMP_COLUMN_NAME = "kinesis.timestamp"; + private static final String DEFAULT_PARTITION_KEY_COLUMN_NAME = "kinesis.partitionKey"; + + // Since KinesisInputFormat blends data from record properties, and payload, timestamp spec can be pointing to an + // attribute within one of these 2 sections. To handle scenarios where there is no timestamp value in the payload, we + // induce an artificial timestamp value to avoid unnecessary parser barf out. Users in such situations can use the + // inputFormat's kinesis record timestamp as its primary timestamp. + public static final String DEFAULT_AUTO_TIMESTAMP_STRING = "__kif_auto_timestamp"; + private final TimestampSpec dummyTimestampSpec = new TimestampSpec(DEFAULT_AUTO_TIMESTAMP_STRING, "auto", DateTimes.EPOCH); + + private final InputFormat valueFormat; + private final String timestampColumnName; + private final String partitionKeyColumnName; + + public KinesisInputFormat( + @JsonProperty("valueFormat") InputFormat valueFormat, + @JsonProperty("partitionKeyColumnName") @Nullable String partitionKeyColumnName, + @JsonProperty("timestampColumnName") @Nullable String timestampColumnName + ) + { + this.valueFormat = Preconditions.checkNotNull(valueFormat, "valueFormat must not be null"); + Preconditions.checkState( + !(timestampColumnName != null && timestampColumnName.equals(partitionKeyColumnName)), + "timestampColumnName and partitionKeyColumnName must be different" + ); + this.partitionKeyColumnName = partitionKeyColumnName != null + ? partitionKeyColumnName + : DEFAULT_PARTITION_KEY_COLUMN_NAME; + this.timestampColumnName = timestampColumnName != null ? timestampColumnName : DEFAULT_TIMESTAMP_COLUMN_NAME; + } + + @Override + public boolean isSplittable() + { + return false; + } + + @Override + public InputEntityReader createReader(InputRowSchema inputRowSchema, InputEntity source, File temporaryDirectory) + { + final SettableByteEntity settableByteEntitySource; + if (source instanceof SettableByteEntity) { + settableByteEntitySource = (SettableByteEntity) source; + } else { + settableByteEntitySource = new SettableByteEntity<>(); + settableByteEntitySource.setEntity((KinesisRecordEntity) source); + } + InputRowSchema newInputRowSchema = new InputRowSchema( + dummyTimestampSpec, + inputRowSchema.getDimensionsSpec(), + inputRowSchema.getColumnsFilter(), + inputRowSchema.getMetricNames() + ); + return new KinesisInputReader( + inputRowSchema, + settableByteEntitySource, + JsonInputFormat.withLineSplittable(valueFormat, false).createReader( + newInputRowSchema, + source, + temporaryDirectory + ), + partitionKeyColumnName, + timestampColumnName + ); + } + + @JsonProperty + public InputFormat getValueFormat() + { + return valueFormat; + } + + @Nullable + @JsonProperty + public String getTimestampColumnName() + { + return timestampColumnName; + } + + @Nullable + @JsonProperty + public String getPartitionKeyColumnName() + { + return partitionKeyColumnName; + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + KinesisInputFormat that = (KinesisInputFormat) o; + return Objects.equals(valueFormat, that.valueFormat) + && Objects.equals(timestampColumnName, that.timestampColumnName) + && Objects.equals(partitionKeyColumnName, that.partitionKeyColumnName); + } + + @Override + public int hashCode() + { + return Objects.hash(valueFormat, timestampColumnName, partitionKeyColumnName); + } +} diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/data/input/kinesis/KinesisInputReader.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/data/input/kinesis/KinesisInputReader.java new file mode 100644 index 00000000000..d0c30280a2b --- /dev/null +++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/data/input/kinesis/KinesisInputReader.java @@ -0,0 +1,256 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.data.input.kinesis; + +import com.google.common.collect.Lists; +import org.apache.druid.data.input.InputEntityReader; +import org.apache.druid.data.input.InputRow; +import org.apache.druid.data.input.InputRowListPlusRawValues; +import org.apache.druid.data.input.InputRowSchema; +import org.apache.druid.data.input.MapBasedInputRow; +import org.apache.druid.data.input.impl.MapInputRowParser; +import org.apache.druid.indexing.seekablestream.SettableByteEntity; +import org.apache.druid.java.util.common.CloseableIterators; +import org.apache.druid.java.util.common.parsers.CloseableIterator; +import org.joda.time.DateTime; + +import java.io.IOException; +import java.util.AbstractMap; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.function.Function; +import java.util.stream.Collectors; + +public class KinesisInputReader implements InputEntityReader +{ + + private final InputRowSchema inputRowSchema; + private final SettableByteEntity source; + private final InputEntityReader valueParser; + private final String partitionKeyColumnName; + private final String timestampColumnName; + + public KinesisInputReader( + InputRowSchema inputRowSchema, + SettableByteEntity source, + InputEntityReader valueParser, + String partitionKeyColumnName, + String timestampColumnName + ) + { + this.inputRowSchema = inputRowSchema; + this.source = source; + this.valueParser = valueParser; + this.partitionKeyColumnName = partitionKeyColumnName; + this.timestampColumnName = timestampColumnName; + + } + + @Override + public CloseableIterator read() throws IOException + { + final KinesisRecordEntity record = source.getEntity(); + final Map mergedHeaderMap = extractHeaders(record); + + if (record.getRecord().getData() != null) { + return buildBlendedRows(valueParser, mergedHeaderMap); + } else { + return CloseableIterators.withEmptyBaggage(buildInputRowsForMap(mergedHeaderMap).iterator()); + } + } + + @Override + public CloseableIterator sample() throws IOException + { + final KinesisRecordEntity record = source.getEntity(); + InputRowListPlusRawValues headers = extractHeaderSample(record); + if (record.getRecord().getData() != null) { + return buildBlendedRowsSample(valueParser, headers.getRawValues()); + } else { + final List rows = Collections.singletonList(headers); + return CloseableIterators.withEmptyBaggage(rows.iterator()); + } + } + + private Map extractHeaders(KinesisRecordEntity record) + { + final Map mergedHeaderMap = new HashMap<>(); + mergedHeaderMap.put(timestampColumnName, record.getRecord().getApproximateArrivalTimestamp().getTime()); + mergedHeaderMap.put(partitionKeyColumnName, record.getRecord().getPartitionKey()); + return mergedHeaderMap; + } + + private CloseableIterator buildBlendedRows( + InputEntityReader valueParser, + Map headerKeyList + ) throws IOException + { + return valueParser.read().map( + r -> { + final HashSet newDimensions = new HashSet<>(r.getDimensions()); + final Map event = buildBlendedEventMap(r::getRaw, newDimensions, headerKeyList); + newDimensions.addAll(headerKeyList.keySet()); + // Remove the dummy timestamp added in KinesisInputFormat + newDimensions.remove(KinesisInputFormat.DEFAULT_AUTO_TIMESTAMP_STRING); + + final DateTime timestamp = MapInputRowParser.parseTimestamp(inputRowSchema.getTimestampSpec(), event); + return new MapBasedInputRow( + timestamp, + MapInputRowParser.findDimensions( + inputRowSchema.getTimestampSpec(), + inputRowSchema.getDimensionsSpec(), + newDimensions + ), + event + ); + } + ); + } + + private InputRowListPlusRawValues extractHeaderSample(KinesisRecordEntity record) + { + Map mergedHeaderMap = extractHeaders(record); + return InputRowListPlusRawValues.of(buildInputRowsForMap(mergedHeaderMap), mergedHeaderMap); + } + + private CloseableIterator buildBlendedRowsSample( + InputEntityReader valueParser, + Map headerKeyList + ) throws IOException + { + return valueParser.sample().map( + rowAndValues -> { + if (rowAndValues.getParseException() != null) { + return rowAndValues; + } + List newInputRows = Lists.newArrayListWithCapacity(rowAndValues.getInputRows().size()); + List> newRawRows = Lists.newArrayListWithCapacity(rowAndValues.getRawValues().size()); + + for (Map raw : rowAndValues.getRawValuesList()) { + newRawRows.add(buildBlendedEventMap(raw::get, raw.keySet(), headerKeyList)); + } + for (InputRow r : rowAndValues.getInputRows()) { + if (r != null) { + final HashSet newDimensions = new HashSet<>(r.getDimensions()); + final Map event = buildBlendedEventMap( + r::getRaw, + newDimensions, + headerKeyList + ); + newDimensions.addAll(headerKeyList.keySet()); + // Remove the dummy timestamp added in KinesisInputFormat + newDimensions.remove(KinesisInputFormat.DEFAULT_AUTO_TIMESTAMP_STRING); + newInputRows.add( + new MapBasedInputRow( + inputRowSchema.getTimestampSpec().extractTimestamp(event), + MapInputRowParser.findDimensions( + inputRowSchema.getTimestampSpec(), + inputRowSchema.getDimensionsSpec(), + newDimensions + ), + event + ) + ); + } + } + return InputRowListPlusRawValues.ofList(newRawRows, newInputRows, null); + } + ); + } + + private List buildInputRowsForMap(Map headerKeyList) + { + return Collections.singletonList( + new MapBasedInputRow( + inputRowSchema.getTimestampSpec().extractTimestamp(headerKeyList), + MapInputRowParser.findDimensions( + inputRowSchema.getTimestampSpec(), + inputRowSchema.getDimensionsSpec(), + headerKeyList.keySet() + ), + headerKeyList + ) + ); + } + + private Map buildBlendedEventMap( + Function getRowValue, + Set rowDimensions, + Map fallback + ) + { + final Set keySet = new HashSet<>(fallback.keySet()); + keySet.addAll(rowDimensions); + + return new AbstractMap() + { + @Override + public Object get(Object key) + { + final String skey = (String) key; + final Object val = getRowValue.apply(skey); + if (val == null) { + return fallback.get(skey); + } + return val; + } + + @Override + public Set keySet() + { + return keySet; + } + + @Override + public Set> entrySet() + { + return keySet().stream() + .map( + field -> new Entry() + { + @Override + public String getKey() + { + return field; + } + + @Override + public Object getValue() + { + return get(field); + } + + @Override + public Object setValue(final Object value) + { + throw new UnsupportedOperationException(); + } + } + ) + .collect(Collectors.toCollection(LinkedHashSet::new)); + } + }; + } +} diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/data/input/kinesis/KinesisRecordEntity.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/data/input/kinesis/KinesisRecordEntity.java new file mode 100644 index 00000000000..a490fd8f4c3 --- /dev/null +++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/data/input/kinesis/KinesisRecordEntity.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.data.input.kinesis; + +import com.amazonaws.services.kinesis.model.Record; +import org.apache.druid.data.input.InputFormat; +import org.apache.druid.data.input.impl.ByteEntity; +import org.apache.druid.indexing.kinesis.KinesisRecordSupplier; + +/** + * A {@link ByteEntity} generated by {@link KinesisRecordSupplier} and fed to any {@link InputFormat} used by kinesis + * indexing tasks. + *

+ * It can be used as a regular ByteEntity, in which case the kinesis record value is returned, but the {@link #getRecord} + * method also allows Kinesis-aware {@link InputFormat} implementations to read the full kinesis record, including + * timestamp, encrytion key, patition key, and sequence number + *

+ * NOTE: Any records with null values will be returned as records with just only kinesis properties and no data payload + */ +public class KinesisRecordEntity extends ByteEntity +{ + private final Record record; + + public KinesisRecordEntity(Record record) + { + super(record.getData()); + this.record = record; + } + + public Record getRecord() + { + return record; + } +} diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTask.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTask.java index fb019f10030..bea69d96c3d 100644 --- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTask.java +++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTask.java @@ -27,7 +27,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.inject.name.Named; import org.apache.druid.common.aws.AWSCredentialsConfig; -import org.apache.druid.data.input.impl.ByteEntity; +import org.apache.druid.data.input.kinesis.KinesisRecordEntity; import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexing.common.TaskToolbox; import org.apache.druid.indexing.common.task.TaskResource; @@ -46,7 +46,7 @@ import java.util.Collections; import java.util.Map; import java.util.Set; -public class KinesisIndexTask extends SeekableStreamIndexTask +public class KinesisIndexTask extends SeekableStreamIndexTask { private static final String TYPE = "index_kinesis"; @@ -100,7 +100,7 @@ public class KinesisIndexTask extends SeekableStreamIndexTask createTaskRunner() + protected SeekableStreamIndexTaskRunner createTaskRunner() { //noinspection unchecked return new KinesisIndexTaskRunner( diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskRunner.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskRunner.java index 75f23da0e1f..72e61635912 100644 --- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskRunner.java +++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskRunner.java @@ -21,8 +21,8 @@ package org.apache.druid.indexing.kinesis; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; -import org.apache.druid.data.input.impl.ByteEntity; import org.apache.druid.data.input.impl.InputRowParser; +import org.apache.druid.data.input.kinesis.KinesisRecordEntity; import org.apache.druid.indexing.common.LockGranularity; import org.apache.druid.indexing.common.TaskToolbox; import org.apache.druid.indexing.seekablestream.SeekableStreamDataSourceMetadata; @@ -49,7 +49,7 @@ import java.util.Set; import java.util.TreeMap; import java.util.concurrent.ConcurrentMap; -public class KinesisIndexTaskRunner extends SeekableStreamIndexTaskRunner +public class KinesisIndexTaskRunner extends SeekableStreamIndexTaskRunner { private static final EmittingLogger log = new EmittingLogger(KinesisIndexTaskRunner.class); private static final long POLL_TIMEOUT = 100; @@ -81,8 +81,8 @@ public class KinesisIndexTaskRunner extends SeekableStreamIndexTaskRunner> getRecords( - RecordSupplier recordSupplier, TaskToolbox toolbox + protected List> getRecords( + RecordSupplier recordSupplier, TaskToolbox toolbox ) { return recordSupplier.poll(POLL_TIMEOUT); @@ -119,7 +119,7 @@ public class KinesisIndexTaskRunner extends SeekableStreamIndexTaskRunner recordSupplier, + RecordSupplier recordSupplier, Set> assignment ) { diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexingServiceModule.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexingServiceModule.java index 0cce1a7e698..5ac9022d001 100644 --- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexingServiceModule.java +++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexingServiceModule.java @@ -26,6 +26,7 @@ import com.google.common.collect.ImmutableList; import com.google.inject.Binder; import com.google.inject.name.Names; import org.apache.druid.common.aws.AWSCredentialsConfig; +import org.apache.druid.data.input.kinesis.KinesisInputFormat; import org.apache.druid.guice.JsonConfigProvider; import org.apache.druid.indexing.kinesis.supervisor.KinesisSupervisorSpec; import org.apache.druid.indexing.kinesis.supervisor.KinesisSupervisorTuningConfig; @@ -50,7 +51,8 @@ public class KinesisIndexingServiceModule implements DruidModule new NamedType(KinesisIndexTaskIOConfig.class, SCHEME), new NamedType(KinesisSupervisorTuningConfig.class, SCHEME), new NamedType(KinesisSupervisorSpec.class, SCHEME), - new NamedType(KinesisSamplerSpec.class, SCHEME) + new NamedType(KinesisSamplerSpec.class, SCHEME), + new NamedType(KinesisInputFormat.class, SCHEME) ) ); } diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java index 36047ce429d..07a0da32a95 100644 --- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java +++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java @@ -26,6 +26,7 @@ import com.amazonaws.auth.STSAssumeRoleSessionCredentialsProvider; import com.amazonaws.client.builder.AwsClientBuilder; import com.amazonaws.services.kinesis.AmazonKinesis; import com.amazonaws.services.kinesis.AmazonKinesisClientBuilder; +import com.amazonaws.services.kinesis.clientlibrary.types.UserRecord; import com.amazonaws.services.kinesis.model.DescribeStreamRequest; import com.amazonaws.services.kinesis.model.ExpiredIteratorException; import com.amazonaws.services.kinesis.model.GetRecordsRequest; @@ -49,7 +50,7 @@ import com.google.common.collect.Maps; import org.apache.druid.common.aws.AWSClientUtil; import org.apache.druid.common.aws.AWSCredentialsConfig; import org.apache.druid.common.aws.AWSCredentialsUtils; -import org.apache.druid.data.input.impl.ByteEntity; +import org.apache.druid.data.input.kinesis.KinesisRecordEntity; import org.apache.druid.error.DruidException; import org.apache.druid.indexing.kinesis.supervisor.KinesisSupervisor; import org.apache.druid.indexing.seekablestream.common.OrderedPartitionableRecord; @@ -69,7 +70,6 @@ import javax.annotation.Nullable; import java.lang.invoke.MethodHandle; import java.lang.invoke.MethodHandles; import java.lang.reflect.Method; -import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -94,7 +94,7 @@ import java.util.stream.Collectors; * This class implements a local buffer for storing fetched Kinesis records. Fetching is done * in background threads. */ -public class KinesisRecordSupplier implements RecordSupplier +public class KinesisRecordSupplier implements RecordSupplier { private static final EmittingLogger log = new EmittingLogger(KinesisRecordSupplier.class); private static final long PROVISIONED_THROUGHPUT_EXCEEDED_BACKOFF_MS = 3000; @@ -210,7 +210,7 @@ public class KinesisRecordSupplier implements RecordSupplier currRecord; + OrderedPartitionableRecord currRecord; long recordBufferOfferWaitMillis; try { @@ -248,7 +248,7 @@ public class KinesisRecordSupplier implements RecordSupplier data; + final List data; if (deaggregateHandle == null || getDataHandle == null) { throw new ISE("deaggregateHandle or getDataHandle is null!"); @@ -256,15 +256,15 @@ public class KinesisRecordSupplier implements RecordSupplier(); - final List userRecords = (List) deaggregateHandle.invokeExact( + final List userRecords = (List) deaggregateHandle.invokeExact( Collections.singletonList(kinesisRecord) ); int recordSize = 0; - for (Object userRecord : userRecords) { - ByteEntity byteEntity = new ByteEntity((ByteBuffer) getDataHandle.invoke(userRecord)); - recordSize += byteEntity.getBuffer().array().length; - data.add(byteEntity); + for (UserRecord userRecord : userRecords) { + KinesisRecordEntity kinesisRecordEntity = new KinesisRecordEntity(userRecord); + recordSize += kinesisRecordEntity.getBuffer().array().length; + data.add(kinesisRecordEntity); } @@ -408,7 +408,7 @@ public class KinesisRecordSupplier implements RecordSupplier, PartitionResource> partitionResources = new ConcurrentHashMap<>(); - private MemoryBoundLinkedBlockingQueue> records; + private MemoryBoundLinkedBlockingQueue> records; private final boolean backgroundFetchEnabled; private volatile boolean closed = false; @@ -615,12 +615,12 @@ public class KinesisRecordSupplier implements RecordSupplier> poll(long timeout) + public List> poll(long timeout) { start(); try { - List>> polledRecords = new ArrayList<>(); + List>> polledRecords = new ArrayList<>(); records.drain( polledRecords, @@ -1040,7 +1040,7 @@ public class KinesisRecordSupplier implements RecordSupplier> newQ = + MemoryBoundLinkedBlockingQueue> newQ = new MemoryBoundLinkedBlockingQueue<>(recordBufferSizeBytes); records.stream() diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java index a142f414762..2f00c8c16cc 100644 --- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java +++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java @@ -25,7 +25,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableMap; import org.apache.druid.common.aws.AWSCredentialsConfig; import org.apache.druid.common.utils.IdUtils; -import org.apache.druid.data.input.impl.ByteEntity; +import org.apache.druid.data.input.kinesis.KinesisRecordEntity; import org.apache.druid.indexing.common.task.Task; import org.apache.druid.indexing.common.task.TaskResource; import org.apache.druid.indexing.kinesis.KinesisDataSourceMetadata; @@ -74,7 +74,7 @@ import java.util.stream.Collectors; * tasks to satisfy the desired number of replicas. As tasks complete, new tasks are queued to process the next range of * Kinesis sequences. */ -public class KinesisSupervisor extends SeekableStreamSupervisor +public class KinesisSupervisor extends SeekableStreamSupervisor { private static final EmittingLogger log = new EmittingLogger(KinesisSupervisor.class); @@ -150,7 +150,7 @@ public class KinesisSupervisor extends SeekableStreamSupervisor> createIndexTasks( + protected List> createIndexTasks( int replicas, String baseSequenceName, ObjectMapper sortingMapper, @@ -164,7 +164,7 @@ public class KinesisSupervisor extends SeekableStreamSupervisor context = createBaseTaskContexts(); context.put(CHECKPOINTS_CTX_KEY, checkpoints); - List> taskList = new ArrayList<>(); + List> taskList = new ArrayList<>(); for (int i = 0; i < replicas; i++) { String taskId = IdUtils.getRandomIdWithPrefix(baseSequenceName); taskList.add(new KinesisIndexTask( @@ -183,7 +183,7 @@ public class KinesisSupervisor extends SeekableStreamSupervisor setupRecordSupplier() throws RuntimeException + protected RecordSupplier setupRecordSupplier() throws RuntimeException { KinesisSupervisorIOConfig ioConfig = spec.getIoConfig(); KinesisIndexTaskTuningConfig taskTuningConfig = spec.getTuningConfig(); diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/data/input/kinesis/KinesisInputFormatTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/data/input/kinesis/KinesisInputFormatTest.java new file mode 100644 index 00000000000..130f31681de --- /dev/null +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/data/input/kinesis/KinesisInputFormatTest.java @@ -0,0 +1,940 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.data.input.kinesis; + +import com.amazonaws.services.kinesis.model.Record; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Iterables; +import org.apache.druid.common.config.NullHandling; +import org.apache.druid.data.input.ColumnsFilter; +import org.apache.druid.data.input.InputEntityReader; +import org.apache.druid.data.input.InputFormat; +import org.apache.druid.data.input.InputRow; +import org.apache.druid.data.input.InputRowListPlusRawValues; +import org.apache.druid.data.input.InputRowSchema; +import org.apache.druid.data.input.impl.CsvInputFormat; +import org.apache.druid.data.input.impl.DimensionsSpec; +import org.apache.druid.data.input.impl.JsonInputFormat; +import org.apache.druid.data.input.impl.TimestampSpec; +import org.apache.druid.indexing.common.TestUtils; +import org.apache.druid.indexing.seekablestream.SettableByteEntity; +import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.parsers.CloseableIterator; +import org.apache.druid.java.util.common.parsers.JSONPathFieldSpec; +import org.apache.druid.java.util.common.parsers.JSONPathFieldType; +import org.apache.druid.java.util.common.parsers.JSONPathSpec; +import org.apache.druid.java.util.common.parsers.ParseException; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.Collections; +import java.util.Date; +import java.util.List; + +public class KinesisInputFormatTest +{ + static { + NullHandling.initializeForTests(); + } + + + private static final String KINESIS_APPROXIMATE_TIME_DATE = "2024-07-29"; + private static final long KINESIS_APPROXOIMATE_TIMESTAMP_MILLIS = DateTimes.of(KINESIS_APPROXIMATE_TIME_DATE).getMillis(); + private static final String DATA_TIMSTAMP_DATE = "2024-07-30"; + private static final String PARTITION_KEY = "partition_key_1"; + + private static final byte[] SIMPLE_JSON_VALUE_BYTES = StringUtils.toUtf8( + TestUtils.singleQuoteToStandardJson( + "{" + + " 'timestamp': '" + DATA_TIMSTAMP_DATE + "'," + + " 'bar': null," + + " 'foo': 'x'," + + " 'baz': 4," + + " 'o': {'mg': 1}" + + "}" + ) + ); + + private KinesisInputFormat format; + + @Before + public void setUp() + { + format = new KinesisInputFormat( + // Value Format + new JsonInputFormat( + new JSONPathSpec( + true, + ImmutableList.of( + new JSONPathFieldSpec(JSONPathFieldType.ROOT, "root_baz", "baz"), + new JSONPathFieldSpec(JSONPathFieldType.ROOT, "root_baz2", "baz2"), + new JSONPathFieldSpec(JSONPathFieldType.PATH, "path_omg", "$.o.mg"), + new JSONPathFieldSpec(JSONPathFieldType.PATH, "path_omg2", "$.o.mg2"), + new JSONPathFieldSpec(JSONPathFieldType.JQ, "jq_omg", ".o.mg"), + new JSONPathFieldSpec(JSONPathFieldType.JQ, "jq_omg2", ".o.mg2") + ) + ), + null, + null, + false, + false + ), + "kinesis.newts.partitionKey", + "kinesis.newts.timestamp" + ); + } + + @Test + public void testSerde() throws JsonProcessingException + { + final ObjectMapper mapper = new ObjectMapper(); + KinesisInputFormat kif = new KinesisInputFormat( + // Value Format + new JsonInputFormat( + new JSONPathSpec( + true, + ImmutableList.of( + new JSONPathFieldSpec(JSONPathFieldType.ROOT, "root_baz", "baz"), + new JSONPathFieldSpec(JSONPathFieldType.ROOT, "root_baz2", "baz2"), + new JSONPathFieldSpec(JSONPathFieldType.PATH, "path_omg", "$.o.mg"), + new JSONPathFieldSpec(JSONPathFieldType.PATH, "path_omg2", "$.o.mg2"), + new JSONPathFieldSpec(JSONPathFieldType.JQ, "jq_omg", ".o.mg"), + new JSONPathFieldSpec(JSONPathFieldType.JQ, "jq_omg2", ".o.mg2") + ) + ), + null, + null, + false, + false + ), + "kinesis.newts.partitionKey", + "kinesis.newts.timestamp" + ); + Assert.assertEquals(format, kif); + + final byte[] formatBytes = mapper.writeValueAsBytes(format); + final byte[] kifBytes = mapper.writeValueAsBytes(kif); + Assert.assertArrayEquals(formatBytes, kifBytes); + } + + @Test + public void testTimestampFromHeader() throws IOException + { + KinesisRecordEntity inputEntity = makeInputEntity(SIMPLE_JSON_VALUE_BYTES, KINESIS_APPROXOIMATE_TIMESTAMP_MILLIS); + + final InputEntityReader reader = format.createReader( + new InputRowSchema( + new TimestampSpec("kinesis.newts.timestamp", "iso", null), + new DimensionsSpec( + DimensionsSpec.getDefaultSchemas( + ImmutableList.of( + "bar", + "foo" + ) + ) + ), + ColumnsFilter.all() + ), + newSettableByteEntity(inputEntity), + null + ); + + final int numExpectedIterations = 1; + try (CloseableIterator iterator = reader.read()) { + int numActualIterations = 0; + while (iterator.hasNext()) { + + final InputRow row = iterator.next(); + // Payload verifications + // this isn't super realistic, since most of these columns are not actually defined in the dimensionSpec + // but test reading them anyway since it isn't technically illegal + + Assert.assertEquals(DateTimes.of(KINESIS_APPROXIMATE_TIME_DATE), row.getTimestamp()); + Assert.assertEquals( + String.valueOf(KINESIS_APPROXOIMATE_TIMESTAMP_MILLIS), + Iterables.getOnlyElement(row.getDimension("kinesis.newts.timestamp")) + ); + Assert.assertEquals(PARTITION_KEY, Iterables.getOnlyElement(row.getDimension("kinesis.newts.partitionKey"))); + Assert.assertEquals("x", Iterables.getOnlyElement(row.getDimension("foo"))); + Assert.assertEquals("4", Iterables.getOnlyElement(row.getDimension("baz"))); + Assert.assertEquals("4", Iterables.getOnlyElement(row.getDimension("root_baz"))); + Assert.assertEquals("1", Iterables.getOnlyElement(row.getDimension("path_omg"))); + Assert.assertEquals("1", Iterables.getOnlyElement(row.getDimension("jq_omg"))); + Assert.assertEquals(ImmutableMap.of("mg", 1L), row.getRaw("o")); + + Assert.assertTrue(row.getDimension("root_baz2").isEmpty()); + Assert.assertTrue(row.getDimension("path_omg2").isEmpty()); + Assert.assertTrue(row.getDimension("jq_omg2").isEmpty()); + Assert.assertTrue(row.getDimension("jq_omg2").isEmpty()); + numActualIterations++; + } + + Assert.assertEquals(numExpectedIterations, numActualIterations); + } + } + + @Test + public void testRawSample() throws IOException + { + KinesisRecordEntity inputEntity = makeInputEntity(SIMPLE_JSON_VALUE_BYTES, KINESIS_APPROXOIMATE_TIMESTAMP_MILLIS); + + final InputEntityReader reader = format.createReader( + new InputRowSchema( + new TimestampSpec(KinesisInputFormat.DEFAULT_AUTO_TIMESTAMP_STRING, "auto", DateTimes.EPOCH), + new DimensionsSpec( + DimensionsSpec.getDefaultSchemas( + ImmutableList.of( + "bar", + "foo" + ) + ) + ), + ColumnsFilter.all() + ), + newSettableByteEntity(inputEntity), + null + ); + + final int numExpectedIterations = 1; + try (CloseableIterator iterator = reader.sample()) { + int numActualIterations = 0; + while (iterator.hasNext()) { + + final InputRowListPlusRawValues rawValues = iterator.next(); + Assert.assertEquals(1, rawValues.getInputRows().size()); + InputRow row = rawValues.getInputRows().get(0); + // Payload verifications + // this isn't super realistic, since most of these columns are not actually defined in the dimensionSpec + // but test reading them anyway since it isn't technically illegal + + Assert.assertEquals( + String.valueOf(KINESIS_APPROXOIMATE_TIMESTAMP_MILLIS), + Iterables.getOnlyElement(row.getDimension("kinesis.newts.timestamp")) + ); + Assert.assertEquals(PARTITION_KEY, Iterables.getOnlyElement(row.getDimension("kinesis.newts.partitionKey"))); + Assert.assertEquals("x", Iterables.getOnlyElement(row.getDimension("foo"))); + Assert.assertEquals("4", Iterables.getOnlyElement(row.getDimension("baz"))); + Assert.assertEquals("4", Iterables.getOnlyElement(row.getDimension("root_baz"))); + Assert.assertEquals("1", Iterables.getOnlyElement(row.getDimension("path_omg"))); + Assert.assertEquals("1", Iterables.getOnlyElement(row.getDimension("jq_omg"))); + Assert.assertEquals(ImmutableMap.of("mg", 1L), row.getRaw("o")); + + Assert.assertTrue(row.getDimension("root_baz2").isEmpty()); + Assert.assertTrue(row.getDimension("path_omg2").isEmpty()); + Assert.assertTrue(row.getDimension("jq_omg2").isEmpty()); + Assert.assertTrue(row.getDimension("jq_omg2").isEmpty()); + + numActualIterations++; + } + + Assert.assertEquals(numExpectedIterations, numActualIterations); + } + } + + @Test + public void testProcessesSampleTimestampFromHeader() throws IOException + { + KinesisRecordEntity inputEntity = makeInputEntity(SIMPLE_JSON_VALUE_BYTES, KINESIS_APPROXOIMATE_TIMESTAMP_MILLIS); + + final InputEntityReader reader = format.createReader( + new InputRowSchema( + new TimestampSpec("kinesis.newts.timestamp", "iso", null), + new DimensionsSpec( + DimensionsSpec.getDefaultSchemas( + ImmutableList.of( + "bar", + "foo" + ) + ) + ), + ColumnsFilter.all() + ), + newSettableByteEntity(inputEntity), + null + ); + + final int numExpectedIterations = 1; + try (CloseableIterator iterator = reader.sample()) { + int numActualIterations = 0; + while (iterator.hasNext()) { + + final InputRowListPlusRawValues rawValues = iterator.next(); + Assert.assertEquals(1, rawValues.getInputRows().size()); + InputRow row = rawValues.getInputRows().get(0); + // Payload verifications + // this isn't super realistic, since most of these columns are not actually defined in the dimensionSpec + // but test reading them anyway since it isn't technically illegal + + Assert.assertEquals(DateTimes.of(String.valueOf(KINESIS_APPROXOIMATE_TIMESTAMP_MILLIS)), row.getTimestamp()); + Assert.assertEquals( + String.valueOf(KINESIS_APPROXOIMATE_TIMESTAMP_MILLIS), + Iterables.getOnlyElement(row.getDimension("kinesis.newts.timestamp")) + ); + Assert.assertEquals(PARTITION_KEY, Iterables.getOnlyElement(row.getDimension("kinesis.newts.partitionKey"))); + Assert.assertEquals("x", Iterables.getOnlyElement(row.getDimension("foo"))); + Assert.assertEquals("4", Iterables.getOnlyElement(row.getDimension("baz"))); + Assert.assertEquals("4", Iterables.getOnlyElement(row.getDimension("root_baz"))); + Assert.assertEquals("1", Iterables.getOnlyElement(row.getDimension("path_omg"))); + Assert.assertEquals("1", Iterables.getOnlyElement(row.getDimension("jq_omg"))); + Assert.assertEquals(ImmutableMap.of("mg", 1L), row.getRaw("o")); + + Assert.assertTrue(row.getDimension("root_baz2").isEmpty()); + Assert.assertTrue(row.getDimension("path_omg2").isEmpty()); + Assert.assertTrue(row.getDimension("jq_omg2").isEmpty()); + Assert.assertTrue(row.getDimension("jq_omg2").isEmpty()); + + numActualIterations++; + } + + Assert.assertEquals(numExpectedIterations, numActualIterations); + } + } + + @Test + public void testWithMultipleMixedRecordsTimestampFromHeader() throws IOException + { + final byte[][] values = new byte[5][]; + for (int i = 0; i < values.length; i++) { + values[i] = StringUtils.toUtf8( + "{\n" + + " \"timestamp\": \"2024-07-2" + i + "\",\n" + + " \"bar\": null,\n" + + " \"foo\": \"x\",\n" + + " \"baz\": 4,\n" + + " \"index\": " + i + ",\n" + + " \"o\": {\n" + + " \"mg\": 1\n" + + " }\n" + + "}" + ); + } + + SettableByteEntity settableByteEntity = new SettableByteEntity<>(); + + final InputEntityReader reader = format.createReader( + new InputRowSchema( + new TimestampSpec("kinesis.newts.timestamp", "iso", null), + new DimensionsSpec( + DimensionsSpec.getDefaultSchemas( + ImmutableList.of( + "bar", + "foo", + "kinesis.newts.timestamp" + ) + ) + ), + ColumnsFilter.all() + ), + settableByteEntity, + null + ); + + for (int i = 0; i < values.length; i++) { + KinesisRecordEntity inputEntity = makeInputEntity(values[i], DateTimes.of("2024-07-1" + i).getMillis()); + settableByteEntity.setEntity(inputEntity); + + final int numExpectedIterations = 1; + try (CloseableIterator iterator = reader.read()) { + int numActualIterations = 0; + while (iterator.hasNext()) { + + final InputRow row = iterator.next(); + + // Payload verification + // this isn't super realistic, since most of these columns are not actually defined in the dimensionSpec + // but test reading them anyway since it isn't technically illegal + Assert.assertEquals(DateTimes.of("2024-07-1" + i), row.getTimestamp()); + Assert.assertEquals( + String.valueOf(DateTimes.of("2024-07-1" + i).getMillis()), + Iterables.getOnlyElement(row.getDimension("kinesis.newts.timestamp")) + ); + Assert.assertEquals(PARTITION_KEY, Iterables.getOnlyElement(row.getDimension("kinesis.newts.partitionKey"))); + Assert.assertEquals("x", Iterables.getOnlyElement(row.getDimension("foo"))); + Assert.assertEquals("4", Iterables.getOnlyElement(row.getDimension("baz"))); + Assert.assertEquals("4", Iterables.getOnlyElement(row.getDimension("root_baz"))); + Assert.assertEquals("1", Iterables.getOnlyElement(row.getDimension("path_omg"))); + Assert.assertEquals("1", Iterables.getOnlyElement(row.getDimension("jq_omg"))); + Assert.assertEquals(ImmutableMap.of("mg", 1L), row.getRaw("o")); + Assert.assertEquals(String.valueOf(i), Iterables.getOnlyElement(row.getDimension("index"))); + + Assert.assertTrue(row.getDimension("root_baz2").isEmpty()); + Assert.assertTrue(row.getDimension("path_omg2").isEmpty()); + Assert.assertTrue(row.getDimension("jq_omg2").isEmpty()); + + numActualIterations++; + } + + Assert.assertEquals(numExpectedIterations, numActualIterations); + } + } + } + + @Test + public void testTimestampFromData() throws IOException + { + KinesisRecordEntity inputEntity = makeInputEntity(SIMPLE_JSON_VALUE_BYTES, KINESIS_APPROXOIMATE_TIMESTAMP_MILLIS); + + final InputEntityReader reader = format.createReader( + new InputRowSchema( + new TimestampSpec("timestamp", "iso", null), + new DimensionsSpec( + DimensionsSpec.getDefaultSchemas( + ImmutableList.of( + "bar", + "foo" + ) + ) + ), + ColumnsFilter.all() + ), + newSettableByteEntity(inputEntity), + null + ); + + final int numExpectedIterations = 1; + try (CloseableIterator iterator = reader.read()) { + int numActualIterations = 0; + while (iterator.hasNext()) { + + final InputRow row = iterator.next(); + // Payload verifications + // this isn't super realistic, since most of these columns are not actually defined in the dimensionSpec + // but test reading them anyway since it isn't technically illegal + + Assert.assertEquals(DateTimes.of(DATA_TIMSTAMP_DATE), row.getTimestamp()); + Assert.assertEquals( + String.valueOf(KINESIS_APPROXOIMATE_TIMESTAMP_MILLIS), + Iterables.getOnlyElement(row.getDimension("kinesis.newts.timestamp")) + ); + Assert.assertEquals(PARTITION_KEY, Iterables.getOnlyElement(row.getDimension("kinesis.newts.partitionKey"))); + Assert.assertEquals("x", Iterables.getOnlyElement(row.getDimension("foo"))); + Assert.assertEquals("4", Iterables.getOnlyElement(row.getDimension("baz"))); + Assert.assertEquals("4", Iterables.getOnlyElement(row.getDimension("root_baz"))); + Assert.assertEquals("1", Iterables.getOnlyElement(row.getDimension("path_omg"))); + Assert.assertEquals("1", Iterables.getOnlyElement(row.getDimension("jq_omg"))); + Assert.assertEquals(ImmutableMap.of("mg", 1L), row.getRaw("o")); + + Assert.assertTrue(row.getDimension("root_baz2").isEmpty()); + Assert.assertTrue(row.getDimension("path_omg2").isEmpty()); + Assert.assertTrue(row.getDimension("jq_omg2").isEmpty()); + Assert.assertTrue(row.getDimension("jq_omg2").isEmpty()); + numActualIterations++; + } + + Assert.assertEquals(numExpectedIterations, numActualIterations); + } + } + + @Test + public void testWithMultipleMixedRecordsTimestampFromData() throws IOException + { + final byte[][] values = new byte[5][]; + for (int i = 0; i < values.length; i++) { + values[i] = StringUtils.toUtf8( + "{\n" + + " \"timestamp\": \"2024-07-2" + i + "\",\n" + + " \"bar\": null,\n" + + " \"foo\": \"x\",\n" + + " \"baz\": 4,\n" + + " \"index\": " + i + ",\n" + + " \"o\": {\n" + + " \"mg\": 1\n" + + " }\n" + + "}" + ); + } + + SettableByteEntity settableByteEntity = new SettableByteEntity<>(); + + final InputEntityReader reader = format.createReader( + new InputRowSchema( + new TimestampSpec("timestamp", "iso", null), + new DimensionsSpec( + DimensionsSpec.getDefaultSchemas( + ImmutableList.of( + "bar", + "foo", + "kinesis.newts.timestamp" + ) + ) + ), + ColumnsFilter.all() + ), + settableByteEntity, + null + ); + + for (int i = 0; i < values.length; i++) { + KinesisRecordEntity inputEntity = makeInputEntity(values[i], KINESIS_APPROXOIMATE_TIMESTAMP_MILLIS); + settableByteEntity.setEntity(inputEntity); + + final int numExpectedIterations = 1; + try (CloseableIterator iterator = reader.read()) { + int numActualIterations = 0; + while (iterator.hasNext()) { + + final InputRow row = iterator.next(); + + // Payload verification + // this isn't super realistic, since most of these columns are not actually defined in the dimensionSpec + // but test reading them anyway since it isn't technically illegal + Assert.assertEquals(DateTimes.of("2024-07-2" + i), row.getTimestamp()); + Assert.assertEquals( + String.valueOf(DateTimes.of("2024-07-29").getMillis()), + Iterables.getOnlyElement(row.getDimension("kinesis.newts.timestamp")) + ); + Assert.assertEquals(PARTITION_KEY, Iterables.getOnlyElement(row.getDimension("kinesis.newts.partitionKey"))); + Assert.assertEquals("x", Iterables.getOnlyElement(row.getDimension("foo"))); + Assert.assertEquals("4", Iterables.getOnlyElement(row.getDimension("baz"))); + Assert.assertEquals("4", Iterables.getOnlyElement(row.getDimension("root_baz"))); + Assert.assertEquals("1", Iterables.getOnlyElement(row.getDimension("path_omg"))); + Assert.assertEquals("1", Iterables.getOnlyElement(row.getDimension("jq_omg"))); + Assert.assertEquals(ImmutableMap.of("mg", 1L), row.getRaw("o")); + Assert.assertEquals(String.valueOf(i), Iterables.getOnlyElement(row.getDimension("index"))); + + Assert.assertTrue(row.getDimension("root_baz2").isEmpty()); + Assert.assertTrue(row.getDimension("path_omg2").isEmpty()); + Assert.assertTrue(row.getDimension("jq_omg2").isEmpty()); + + numActualIterations++; + } + + Assert.assertEquals(numExpectedIterations, numActualIterations); + } + } + } + + @Test + public void testMissingTimestampThrowsException() throws IOException + { + KinesisRecordEntity inputEntity = + makeInputEntity(SIMPLE_JSON_VALUE_BYTES, KINESIS_APPROXOIMATE_TIMESTAMP_MILLIS); + + final InputEntityReader reader = format.createReader( + new InputRowSchema( + new TimestampSpec("time", "iso", null), + new DimensionsSpec( + DimensionsSpec.getDefaultSchemas( + ImmutableList.of( + "bar", + "foo", + "kinesis.newts.timestamp" + ) + ) + ), + ColumnsFilter.all() + ), + newSettableByteEntity(inputEntity), + null + ); + + try (CloseableIterator iterator = reader.read()) { + while (iterator.hasNext()) { + Throwable t = Assert.assertThrows(ParseException.class, iterator::next); + Assert.assertTrue( + t.getMessage().startsWith("Timestamp[null] is unparseable! Event: {") + ); + } + } + } + + @Test + public void testWithSchemaDiscoveryKinesisTimestampExcluded() throws IOException + { + KinesisRecordEntity inputEntity = + makeInputEntity(SIMPLE_JSON_VALUE_BYTES, KINESIS_APPROXOIMATE_TIMESTAMP_MILLIS); + + final InputEntityReader reader = format.createReader( + new InputRowSchema( + new TimestampSpec("timestamp", "iso", null), + DimensionsSpec.builder() + .useSchemaDiscovery(true) + .setDimensionExclusions(ImmutableList.of("kinesis.newts.timestamp")) + .build(), + ColumnsFilter.all() + ), + newSettableByteEntity(inputEntity), + null + ); + + final int numExpectedIterations = 1; + try (CloseableIterator iterator = reader.read()) { + int numActualIterations = 0; + while (iterator.hasNext()) { + + final InputRow row = iterator.next(); + List expectedDimensions = Arrays.asList( + "foo", + "root_baz", + "o", + "bar", + "path_omg", + "jq_omg", + "jq_omg2", + "baz", + "root_baz2", + "path_omg2", + "kinesis.newts.partitionKey" + ); + Collections.sort(expectedDimensions); + Collections.sort(row.getDimensions()); + Assert.assertEquals( + expectedDimensions, + row.getDimensions() + ); + + // Payload verifications + Assert.assertEquals(DateTimes.of(DATA_TIMSTAMP_DATE), row.getTimestamp()); + Assert.assertEquals( + String.valueOf(DateTimes.of(KINESIS_APPROXIMATE_TIME_DATE).getMillis()), + Iterables.getOnlyElement(row.getDimension("kinesis.newts.timestamp")) + ); + Assert.assertEquals(PARTITION_KEY, Iterables.getOnlyElement(row.getDimension("kinesis.newts.partitionKey"))); + Assert.assertEquals("x", Iterables.getOnlyElement(row.getDimension("foo"))); + Assert.assertEquals("4", Iterables.getOnlyElement(row.getDimension("baz"))); + Assert.assertEquals("4", Iterables.getOnlyElement(row.getDimension("root_baz"))); + Assert.assertEquals("1", Iterables.getOnlyElement(row.getDimension("path_omg"))); + Assert.assertEquals("1", Iterables.getOnlyElement(row.getDimension("jq_omg"))); + Assert.assertEquals(ImmutableMap.of("mg", 1L), row.getRaw("o")); + + Assert.assertTrue(row.getDimension("root_baz2").isEmpty()); + Assert.assertTrue(row.getDimension("path_omg2").isEmpty()); + Assert.assertTrue(row.getDimension("jq_omg2").isEmpty()); + + numActualIterations++; + } + + Assert.assertEquals(numExpectedIterations, numActualIterations); + } + } + + @Test + public void testWithSchemaDiscoveryTimestampFromHeader() throws IOException + { + KinesisRecordEntity inputEntity = + makeInputEntity(SIMPLE_JSON_VALUE_BYTES, KINESIS_APPROXOIMATE_TIMESTAMP_MILLIS); + + final InputEntityReader reader = format.createReader( + new InputRowSchema( + new TimestampSpec("kinesis.newts.timestamp", "iso", null), + DimensionsSpec.builder() + .useSchemaDiscovery(true) + .build(), + ColumnsFilter.all() + ), + newSettableByteEntity(inputEntity), + null + ); + + final int numExpectedIterations = 1; + try (CloseableIterator iterator = reader.read()) { + int numActualIterations = 0; + while (iterator.hasNext()) { + + final InputRow row = iterator.next(); + List expectedDimensions = Arrays.asList( + "foo", + "timestamp", + "root_baz", + "o", + "bar", + "path_omg", + "jq_omg", + "jq_omg2", + "baz", + "root_baz2", + "path_omg2", + "kinesis.newts.partitionKey" + ); + Collections.sort(expectedDimensions); + Collections.sort(row.getDimensions()); + Assert.assertEquals( + expectedDimensions, + row.getDimensions() + ); + + // Payload verifications + Assert.assertEquals(DateTimes.of(KINESIS_APPROXIMATE_TIME_DATE), row.getTimestamp()); + Assert.assertEquals( + String.valueOf(DateTimes.of(KINESIS_APPROXIMATE_TIME_DATE).getMillis()), + Iterables.getOnlyElement(row.getDimension("kinesis.newts.timestamp")) + ); + Assert.assertEquals(PARTITION_KEY, Iterables.getOnlyElement(row.getDimension("kinesis.newts.partitionKey"))); + Assert.assertEquals("x", Iterables.getOnlyElement(row.getDimension("foo"))); + Assert.assertEquals("4", Iterables.getOnlyElement(row.getDimension("baz"))); + Assert.assertEquals("4", Iterables.getOnlyElement(row.getDimension("root_baz"))); + Assert.assertEquals("1", Iterables.getOnlyElement(row.getDimension("path_omg"))); + Assert.assertEquals("1", Iterables.getOnlyElement(row.getDimension("jq_omg"))); + Assert.assertEquals(ImmutableMap.of("mg", 1L), row.getRaw("o")); + + Assert.assertTrue(row.getDimension("root_baz2").isEmpty()); + Assert.assertTrue(row.getDimension("path_omg2").isEmpty()); + Assert.assertTrue(row.getDimension("jq_omg2").isEmpty()); + + numActualIterations++; + } + + Assert.assertEquals(numExpectedIterations, numActualIterations); + } + } + + @Test + public void testValueInCsvFormat() throws IOException + { + format = new KinesisInputFormat( + // Value Format + new CsvInputFormat( + Arrays.asList("foo", "bar", "timestamp", "baz"), + null, + false, + false, + 0 + ), + "kinesis.newts.partitionKey", + "kinesis.newts.timestamp" + ); + + KinesisRecordEntity inputEntity = + makeInputEntity(StringUtils.toUtf8("x,,2024-07-30,4"), KINESIS_APPROXOIMATE_TIMESTAMP_MILLIS); + + final InputEntityReader reader = format.createReader( + new InputRowSchema( + new TimestampSpec("timestamp", "iso", null), + new DimensionsSpec( + DimensionsSpec.getDefaultSchemas( + ImmutableList.of( + "bar", + "foo", + "kinesis.newts.timestamp", + "kinesis.newts.partitionKey" + ) + ) + ), + ColumnsFilter.all() + ), + newSettableByteEntity(inputEntity), + null + ); + + final int numExpectedIterations = 1; + try (CloseableIterator iterator = reader.read()) { + int numActualIterations = 0; + while (iterator.hasNext()) { + + final InputRow row = iterator.next(); + Assert.assertEquals( + Arrays.asList( + "bar", + "foo", + "kinesis.newts.timestamp", + "kinesis.newts.partitionKey" + ), + row.getDimensions() + ); + // Payload verifications + // this isn't super realistic, since most of these columns are not actually defined in the dimensionSpec + // but test reading them anyway since it isn't technically illegal + + Assert.assertEquals(DateTimes.of("2024-07-30"), row.getTimestamp()); + Assert.assertEquals( + String.valueOf(DateTimes.of(KINESIS_APPROXIMATE_TIME_DATE).getMillis()), + Iterables.getOnlyElement(row.getDimension("kinesis.newts.timestamp")) + ); + Assert.assertEquals(PARTITION_KEY, Iterables.getOnlyElement(row.getDimension("kinesis.newts.partitionKey"))); + Assert.assertEquals("x", Iterables.getOnlyElement(row.getDimension("foo"))); + Assert.assertEquals("4", Iterables.getOnlyElement(row.getDimension("baz"))); + Assert.assertTrue(row.getDimension("bar").isEmpty()); + + numActualIterations++; + } + + Assert.assertEquals(numExpectedIterations, numActualIterations); + } + } + + @Test + public void testWithPartialDeclarationSchemaDiscovery() throws IOException + { + KinesisRecordEntity inputEntity = + makeInputEntity(SIMPLE_JSON_VALUE_BYTES, KINESIS_APPROXOIMATE_TIMESTAMP_MILLIS); + + final InputEntityReader reader = format.createReader( + new InputRowSchema( + new TimestampSpec("timestamp", "iso", null), + DimensionsSpec.builder().setDimensions( + DimensionsSpec.getDefaultSchemas(ImmutableList.of("bar")) + ).useSchemaDiscovery(true).build(), + ColumnsFilter.all() + ), + newSettableByteEntity(inputEntity), + null + ); + + final int numExpectedIterations = 1; + try (CloseableIterator iterator = reader.read()) { + int numActualIterations = 0; + while (iterator.hasNext()) { + + final InputRow row = iterator.next(); + + List expectedDimensions = Arrays.asList( + "bar", + "foo", + "kinesis.newts.timestamp", + "kinesis.newts.partitionKey", + "root_baz", + "o", + "path_omg", + "jq_omg", + "jq_omg2", + "baz", + "root_baz2", + "path_omg2" + ); + Collections.sort(expectedDimensions); + Collections.sort(row.getDimensions()); + Assert.assertEquals( + expectedDimensions, + row.getDimensions() + ); + + // Payload verifications + Assert.assertEquals(DateTimes.of(DATA_TIMSTAMP_DATE), row.getTimestamp()); + Assert.assertEquals( + String.valueOf(DateTimes.of(KINESIS_APPROXIMATE_TIME_DATE).getMillis()), + Iterables.getOnlyElement(row.getDimension("kinesis.newts.timestamp")) + ); + Assert.assertEquals(PARTITION_KEY, Iterables.getOnlyElement(row.getDimension("kinesis.newts.partitionKey"))); + Assert.assertEquals("x", Iterables.getOnlyElement(row.getDimension("foo"))); + Assert.assertEquals("4", Iterables.getOnlyElement(row.getDimension("baz"))); + Assert.assertEquals("4", Iterables.getOnlyElement(row.getDimension("root_baz"))); + Assert.assertEquals("1", Iterables.getOnlyElement(row.getDimension("path_omg"))); + Assert.assertEquals("1", Iterables.getOnlyElement(row.getDimension("jq_omg"))); + Assert.assertEquals(ImmutableMap.of("mg", 1L), row.getRaw("o")); + + Assert.assertTrue(row.getDimension("root_baz2").isEmpty()); + Assert.assertTrue(row.getDimension("path_omg2").isEmpty()); + Assert.assertTrue(row.getDimension("jq_omg2").isEmpty()); + + numActualIterations++; + } + + Assert.assertEquals(numExpectedIterations, numActualIterations); + } + } + + @Test + @SuppressWarnings("ResultOfMethodCallIgnored") + public void testValidInputFormatConstruction() + { + InputFormat valueFormat = new JsonInputFormat( + new JSONPathSpec( + true, + ImmutableList.of( + new JSONPathFieldSpec(JSONPathFieldType.ROOT, "root_baz", "baz"), + new JSONPathFieldSpec(JSONPathFieldType.ROOT, "root_baz2", "baz2"), + new JSONPathFieldSpec(JSONPathFieldType.PATH, "path_omg", "$.o.mg"), + new JSONPathFieldSpec(JSONPathFieldType.PATH, "path_omg2", "$.o.mg2"), + new JSONPathFieldSpec(JSONPathFieldType.JQ, "jq_omg", ".o.mg"), + new JSONPathFieldSpec(JSONPathFieldType.JQ, "jq_omg2", ".o.mg2") + ) + ), + null, + null, + false, + false + ); + // null partitionKeyColumnName and null timestampColumnName is valid + new KinesisInputFormat(valueFormat, null, null); + + // non-null partitionKeyColumnName and null timestampColumnName is valid + new KinesisInputFormat(valueFormat, "kinesis.partitionKey", null); + + // null partitionKeyColumnName and non-null timestampColumnName is valid + new KinesisInputFormat(valueFormat, null, "kinesis.timestamp"); + + // non-null partitionKeyColumnName and non-null timestampColumnName is valid + new KinesisInputFormat(valueFormat, "kinesis.partitionKey", "kinesis.timestamp"); + + } + + @Test + @SuppressWarnings("ResultOfMethodCallIgnored") + public void testInvalidInputFormatConstruction() + { + // null value format is invalid + Assert.assertThrows( + "valueFormat must not be null", + NullPointerException.class, + () -> new KinesisInputFormat(null, null, null) + ); + + InputFormat valueFormat = new JsonInputFormat( + new JSONPathSpec( + true, + ImmutableList.of( + new JSONPathFieldSpec(JSONPathFieldType.ROOT, "root_baz", "baz"), + new JSONPathFieldSpec(JSONPathFieldType.ROOT, "root_baz2", "baz2"), + new JSONPathFieldSpec(JSONPathFieldType.PATH, "path_omg", "$.o.mg"), + new JSONPathFieldSpec(JSONPathFieldType.PATH, "path_omg2", "$.o.mg2"), + new JSONPathFieldSpec(JSONPathFieldType.JQ, "jq_omg", ".o.mg"), + new JSONPathFieldSpec(JSONPathFieldType.JQ, "jq_omg2", ".o.mg2") + ) + ), + null, + null, + false, + false + ); + + // partitionKeyColumnName == timestampColumnName is invalid + Assert.assertThrows( + "timestampColumnName and partitionKeyColumnName must be different", + IllegalStateException.class, + () -> new KinesisInputFormat(valueFormat, "kinesis.timestamp", "kinesis.timestamp") + ); + } + + private KinesisRecordEntity makeInputEntity( + byte[] payload, + long kinesisTimestampMillis) + { + return new KinesisRecordEntity( + new Record().withData(ByteBuffer.wrap(payload)) + .withApproximateArrivalTimestamp(new Date(kinesisTimestampMillis)) + .withPartitionKey(PARTITION_KEY) + ); + } + + private SettableByteEntity newSettableByteEntity(KinesisRecordEntity kinesisRecordEntity) + { + SettableByteEntity settableByteEntity = new SettableByteEntity<>(); + settableByteEntity.setEntity(kinesisRecordEntity); + return settableByteEntity; + } +} diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java index 527a6738ffe..80bded2031d 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java @@ -19,6 +19,7 @@ package org.apache.druid.indexing.kinesis; +import com.amazonaws.services.kinesis.model.Record; import com.fasterxml.jackson.annotation.JacksonInject; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; @@ -41,6 +42,7 @@ import org.apache.druid.data.input.impl.DimensionsSpec; import org.apache.druid.data.input.impl.FloatDimensionSchema; import org.apache.druid.data.input.impl.LongDimensionSchema; import org.apache.druid.data.input.impl.StringDimensionSchema; +import org.apache.druid.data.input.kinesis.KinesisRecordEntity; import org.apache.druid.indexer.TaskState; import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexer.report.IngestionStatsAndErrors; @@ -127,39 +129,39 @@ public class KinesisIndexTaskTest extends SeekableStreamIndexTaskTestBase private static final String SHARD_ID0 = "0"; private static final List RECORDS = Arrays.asList( - createRecord("1", "0", jb("2008", "a", "y", "10", "20.0", "1.0")), - createRecord("1", "1", jb("2009", "b", "y", "10", "20.0", "1.0")), - createRecord("1", "2", jb("2010", "c", "y", "10", "20.0", "1.0")), - createRecord("1", "3", jb("2011", "d", "y", "10", "20.0", "1.0")), - createRecord("1", "4", jb("2011", "e", "y", "10", "20.0", "1.0")), - createRecord("1", "5", jb("246140482-04-24T15:36:27.903Z", "x", "z", "10", "20.0", "1.0")), - createRecord("1", "6", new ByteEntity(StringUtils.toUtf8("unparseable"))), - createRecord("1", "7", new ByteEntity(StringUtils.toUtf8(""))), - createRecord("1", "8", new ByteEntity(StringUtils.toUtf8("{}"))), - createRecord("1", "9", jb("2013", "f", "y", "10", "20.0", "1.0")), - createRecord("1", "10", jb("2049", "f", "y", "notanumber", "20.0", "1.0")), - createRecord("1", "11", jb("2049", "f", "y", "10", "notanumber", "1.0")), - createRecord("1", "12", jb("2049", "f", "y", "10", "20.0", "notanumber")), - createRecord("0", "0", jb("2012", "g", "y", "10", "20.0", "1.0")), - createRecord("0", "1", jb("2011", "h", "y", "10", "20.0", "1.0")) + createRecord("1", "0", kjb("2008", "a", "y", "10", "20.0", "1.0")), + createRecord("1", "1", kjb("2009", "b", "y", "10", "20.0", "1.0")), + createRecord("1", "2", kjb("2010", "c", "y", "10", "20.0", "1.0")), + createRecord("1", "3", kjb("2011", "d", "y", "10", "20.0", "1.0")), + createRecord("1", "4", kjb("2011", "e", "y", "10", "20.0", "1.0")), + createRecord("1", "5", kjb("246140482-04-24T15:36:27.903Z", "x", "z", "10", "20.0", "1.0")), + createRecord("1", "6", new KinesisRecordEntity(new Record().withData(new ByteEntity(StringUtils.toUtf8("unparseable")).getBuffer()))), + createRecord("1", "7", new KinesisRecordEntity(new Record().withData(new ByteEntity(StringUtils.toUtf8("")).getBuffer()))), + createRecord("1", "8", new KinesisRecordEntity(new Record().withData(new ByteEntity(StringUtils.toUtf8("{}")).getBuffer()))), + createRecord("1", "9", kjb("2013", "f", "y", "10", "20.0", "1.0")), + createRecord("1", "10", kjb("2049", "f", "y", "notanumber", "20.0", "1.0")), + createRecord("1", "11", kjb("2049", "f", "y", "10", "notanumber", "1.0")), + createRecord("1", "12", kjb("2049", "f", "y", "10", "20.0", "notanumber")), + createRecord("0", "0", kjb("2012", "g", "y", "10", "20.0", "1.0")), + createRecord("0", "1", kjb("2011", "h", "y", "10", "20.0", "1.0")) ); private static final List SINGLE_PARTITION_RECORDS = Arrays.asList( - createRecord("1", "0", jb("2008", "a", "y", "10", "20.0", "1.0")), - createRecord("1", "1", jb("2009", "b", "y", "10", "20.0", "1.0")), - createRecord("1", "2", jb("2010", "c", "y", "10", "20.0", "1.0")), - createRecord("1", "3", jb("2011", "d", "y", "10", "20.0", "1.0")), - createRecord("1", "4", jb("2011", "e", "y", "10", "20.0", "1.0")), - createRecord("1", "5", jb("2012", "a", "y", "10", "20.0", "1.0")), - createRecord("1", "6", jb("2013", "b", "y", "10", "20.0", "1.0")), - createRecord("1", "7", jb("2010", "c", "y", "10", "20.0", "1.0")), - createRecord("1", "8", jb("2011", "d", "y", "10", "20.0", "1.0")), - createRecord("1", "9", jb("2011", "e", "y", "10", "20.0", "1.0")), - createRecord("1", "10", jb("2008", "a", "y", "10", "20.0", "1.0")), - createRecord("1", "11", jb("2009", "b", "y", "10", "20.0", "1.0")), - createRecord("1", "12", jb("2010", "c", "y", "10", "20.0", "1.0")), - createRecord("1", "13", jb("2012", "d", "y", "10", "20.0", "1.0")), - createRecord("1", "14", jb("2013", "e", "y", "10", "20.0", "1.0")) + createRecord("1", "0", kjb("2008", "a", "y", "10", "20.0", "1.0")), + createRecord("1", "1", kjb("2009", "b", "y", "10", "20.0", "1.0")), + createRecord("1", "2", kjb("2010", "c", "y", "10", "20.0", "1.0")), + createRecord("1", "3", kjb("2011", "d", "y", "10", "20.0", "1.0")), + createRecord("1", "4", kjb("2011", "e", "y", "10", "20.0", "1.0")), + createRecord("1", "5", kjb("2012", "a", "y", "10", "20.0", "1.0")), + createRecord("1", "6", kjb("2013", "b", "y", "10", "20.0", "1.0")), + createRecord("1", "7", kjb("2010", "c", "y", "10", "20.0", "1.0")), + createRecord("1", "8", kjb("2011", "d", "y", "10", "20.0", "1.0")), + createRecord("1", "9", kjb("2011", "e", "y", "10", "20.0", "1.0")), + createRecord("1", "10", kjb("2008", "a", "y", "10", "20.0", "1.0")), + createRecord("1", "11", kjb("2009", "b", "y", "10", "20.0", "1.0")), + createRecord("1", "12", kjb("2010", "c", "y", "10", "20.0", "1.0")), + createRecord("1", "13", kjb("2012", "d", "y", "10", "20.0", "1.0")), + createRecord("1", "14", kjb("2013", "e", "y", "10", "20.0", "1.0")) ); private static KinesisRecordSupplier recordSupplier; @@ -272,12 +274,12 @@ public class KinesisIndexTaskTest extends SeekableStreamIndexTaskTestBase record.getPartitionId(), record.getSequenceNumber(), record.getData().stream() - .map(entity -> new ByteEntity(entity.getBuffer())) + .map(entity -> new KinesisRecordEntity(new Record().withData(entity.getBuffer()))) .collect(Collectors.toList()) ); } - private static List> clone( + private static List> clone( List records, int start, int end @@ -289,14 +291,14 @@ public class KinesisIndexTaskTest extends SeekableStreamIndexTaskTestBase /** * Records can only be read once, hence we must use fresh records every time. */ - private static List> clone( + private static List> clone( List records ) { return records.stream().map(KinesisIndexTaskTest::clone).collect(Collectors.toList()); } - private static KinesisRecord createRecord(String partitionId, String sequenceNumber, ByteEntity entity) + private static KinesisRecord createRecord(String partitionId, String sequenceNumber, KinesisRecordEntity entity) { return new KinesisRecord(STREAM, partitionId, sequenceNumber, Collections.singletonList(entity)); } @@ -1697,7 +1699,7 @@ public class KinesisIndexTaskTest extends SeekableStreamIndexTaskTestBase maxRowsPerSegment = 2; maxRecordsPerPoll = 1; maxBytesPerPoll = 1_000_000; - List> records = + List> records = clone(SINGLE_PARTITION_RECORDS); recordSupplier.assign(EasyMock.anyObject()); @@ -2148,7 +2150,7 @@ public class KinesisIndexTaskTest extends SeekableStreamIndexTaskTestBase recordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString()); EasyMock.expectLastCall().anyTimes(); - List> eosRecord = ImmutableList.of( + List> eosRecord = ImmutableList.of( new OrderedPartitionableRecord<>(STREAM, SHARD_ID1, KinesisSequenceNumber.END_OF_SHARD_MARKER, null) ); @@ -2454,6 +2456,18 @@ public class KinesisIndexTaskTest extends SeekableStreamIndexTaskTestBase return task.getRunner().getStatus() == SeekableStreamIndexTaskRunner.Status.READING; } + private static KinesisRecordEntity kjb( + String timestamp, + String dim1, + String dim2, + String dimLong, + String dimFloat, + String met1 + ) + { + return new KinesisRecordEntity(new Record().withData(jb(timestamp, dim1, dim2, dimLong, dimFloat, met1).getBuffer())); + } + @JsonTypeName("index_kinesis") private static class TestableKinesisIndexTask extends KinesisIndexTask { @@ -2497,15 +2511,15 @@ public class KinesisIndexTaskTest extends SeekableStreamIndexTaskTestBase /** * Utility class to keep the test code more readable. */ - private static class KinesisRecord extends OrderedPartitionableRecord + private static class KinesisRecord extends OrderedPartitionableRecord { - private final List data; + private final List data; public KinesisRecord( String stream, String partitionId, String sequenceNumber, - List data + List data ) { super(stream, partitionId, sequenceNumber, data); @@ -2514,7 +2528,7 @@ public class KinesisIndexTaskTest extends SeekableStreamIndexTaskTestBase @Nonnull @Override - public List getData() + public List getData() { return data; } diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplierTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplierTest.java index 5fcf81139eb..7c59ad61ac0 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplierTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplierTest.java @@ -39,6 +39,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import org.apache.druid.data.input.impl.ByteEntity; +import org.apache.druid.data.input.kinesis.KinesisRecordEntity; import org.apache.druid.indexing.seekablestream.common.OrderedPartitionableRecord; import org.apache.druid.indexing.seekablestream.common.StreamPartition; import org.apache.druid.java.util.common.ISE; @@ -99,34 +100,26 @@ public class KinesisRecordSupplierTest extends EasyMockSupport new Record().withData(jb("2012", "g", "y", "10", "20.0", "1.0")).withSequenceNumber("8"), new Record().withData(jb("2011", "h", "y", "10", "20.0", "1.0")).withSequenceNumber("9") ); - private static final List> ALL_RECORDS = ImmutableList.>builder() - .addAll(SHARD0_RECORDS.stream() - .map(x -> new OrderedPartitionableRecord<>( - STREAM, - SHARD_ID0, - x.getSequenceNumber(), - Collections - .singletonList( - new ByteEntity( - x.getData())) - )) - .collect( - Collectors - .toList())) - .addAll(SHARD1_RECORDS.stream() - .map(x -> new OrderedPartitionableRecord<>( - STREAM, - SHARD_ID1, - x.getSequenceNumber(), - Collections - .singletonList( - new ByteEntity( - x.getData())) - )) - .collect( - Collectors - .toList())) - .build(); + private static final List> ALL_RECORDS = ImmutableList.>builder() + .addAll(SHARD0_RECORDS.stream() + .map(x -> new OrderedPartitionableRecord<>( + STREAM, + SHARD_ID0, + x.getSequenceNumber(), + Collections.singletonList(new KinesisRecordEntity(new Record().withData(new ByteEntity(x.getData()).getBuffer()))) + )) + .collect( + Collectors + .toList())) + .addAll(SHARD1_RECORDS.stream() + .map(x -> new OrderedPartitionableRecord<>( + STREAM, + SHARD_ID1, + x.getSequenceNumber(), + Collections.singletonList(new KinesisRecordEntity(new Record().withData(new ByteEntity(x.getData()).getBuffer()))) + )) + .collect(Collectors.toList())) + .build(); private static ByteBuffer jb(String timestamp, String dim1, String dim2, String dimLong, String dimFloat, String met1) @@ -316,7 +309,7 @@ public class KinesisRecordSupplierTest extends EasyMockSupport } // filter out EOS markers - private static List> cleanRecords(List> records) + private static List> cleanRecords(List> records) { return records.stream() .filter(x -> !x.getSequenceNumber() @@ -398,7 +391,7 @@ public class KinesisRecordSupplierTest extends EasyMockSupport Thread.sleep(100); } - List> polledRecords = cleanRecords(recordSupplier.poll( + List> polledRecords = cleanRecords(recordSupplier.poll( POLL_TIMEOUT_MILLIS)); verifyAll(); @@ -457,7 +450,7 @@ public class KinesisRecordSupplierTest extends EasyMockSupport } Assert.assertFalse(recordSupplier.isAnyFetchActive()); - List> polledRecords = cleanRecords(recordSupplier.poll( + List> polledRecords = cleanRecords(recordSupplier.poll( POLL_TIMEOUT_MILLIS)); verifyAll(); @@ -531,7 +524,7 @@ public class KinesisRecordSupplierTest extends EasyMockSupport Thread.sleep(100); } - List> polledRecords = cleanRecords(recordSupplier.poll( + List> polledRecords = cleanRecords(recordSupplier.poll( POLL_TIMEOUT_MILLIS)); verifyAll(); @@ -687,7 +680,7 @@ public class KinesisRecordSupplierTest extends EasyMockSupport Thread.sleep(100); } - OrderedPartitionableRecord firstRecord = recordSupplier.poll(POLL_TIMEOUT_MILLIS).get(0); + OrderedPartitionableRecord firstRecord = recordSupplier.poll(POLL_TIMEOUT_MILLIS).get(0); Assert.assertEquals( ALL_RECORDS.get(7), @@ -705,7 +698,7 @@ public class KinesisRecordSupplierTest extends EasyMockSupport } - OrderedPartitionableRecord record2 = recordSupplier.poll(POLL_TIMEOUT_MILLIS).get(0); + OrderedPartitionableRecord record2 = recordSupplier.poll(POLL_TIMEOUT_MILLIS).get(0); Assert.assertEquals(ALL_RECORDS.get(9), record2); // only one partition in this test. second results come from getRecordsResult0, which has SHARD0_LAG_MILLIS @@ -776,7 +769,7 @@ public class KinesisRecordSupplierTest extends EasyMockSupport Thread.sleep(100); } - List> polledRecords = cleanRecords(recordSupplier.poll( + List> polledRecords = cleanRecords(recordSupplier.poll( POLL_TIMEOUT_MILLIS)); verifyAll(); diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisSamplerSpecTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisSamplerSpecTest.java index b0ba730a350..63144c6a935 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisSamplerSpecTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisSamplerSpecTest.java @@ -19,6 +19,7 @@ package org.apache.druid.indexing.kinesis; +import com.amazonaws.services.kinesis.model.Record; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; @@ -27,7 +28,6 @@ import org.apache.druid.client.indexing.SamplerResponse; import org.apache.druid.client.indexing.SamplerSpec; import org.apache.druid.common.aws.AWSCredentialsConfig; import org.apache.druid.common.config.NullHandling; -import org.apache.druid.data.input.impl.ByteEntity; import org.apache.druid.data.input.impl.DimensionsSpec; import org.apache.druid.data.input.impl.FloatDimensionSchema; import org.apache.druid.data.input.impl.InputRowParser; @@ -37,6 +37,7 @@ import org.apache.druid.data.input.impl.LongDimensionSchema; import org.apache.druid.data.input.impl.StringDimensionSchema; import org.apache.druid.data.input.impl.StringInputRowParser; import org.apache.druid.data.input.impl.TimestampSpec; +import org.apache.druid.data.input.kinesis.KinesisRecordEntity; import org.apache.druid.indexing.kinesis.supervisor.KinesisSupervisorIOConfig; import org.apache.druid.indexing.kinesis.supervisor.KinesisSupervisorSpec; import org.apache.druid.indexing.overlord.sampler.InputSourceSampler; @@ -63,6 +64,7 @@ import org.junit.Assert; import org.junit.Test; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.Arrays; import java.util.Collections; import java.util.Iterator; @@ -99,7 +101,7 @@ public class KinesisSamplerSpecTest extends EasyMockSupport private final KinesisRecordSupplier recordSupplier = mock(KinesisRecordSupplier.class); - private static List> generateRecords(String stream) + private static List> generateRecords(String stream) { return ImmutableList.of( new OrderedPartitionableRecord<>(stream, "1", "0", jb("2008", "a", "y", "10", "20.0", "1.0")), @@ -115,9 +117,9 @@ public class KinesisSamplerSpecTest extends EasyMockSupport stream, "1", "6", - Collections.singletonList(new ByteEntity(StringUtils.toUtf8("unparseable"))) + Collections.singletonList(new KinesisRecordEntity(new Record().withData(ByteBuffer.wrap(StringUtils.toUtf8("unparseable"))))) ), - new OrderedPartitionableRecord<>(stream, "1", "8", Collections.singletonList(new ByteEntity(StringUtils.toUtf8("{}")))) + new OrderedPartitionableRecord<>(stream, "1", "8", Collections.singletonList(new KinesisRecordEntity(new Record().withData(ByteBuffer.wrap(StringUtils.toUtf8("{}")))))) ); } @@ -428,19 +430,19 @@ public class KinesisSamplerSpecTest extends EasyMockSupport Assert.assertFalse(it.hasNext()); } - private static List jb(String ts, String dim1, String dim2, String dimLong, String dimFloat, String met1) + private static List jb(String ts, String dim1, String dim2, String dimLong, String dimFloat, String met1) { try { - return Collections.singletonList(new ByteEntity(new ObjectMapper().writeValueAsBytes( + return Collections.singletonList(new KinesisRecordEntity(new Record().withData(ByteBuffer.wrap(new ObjectMapper().writeValueAsBytes( ImmutableMap.builder() - .put("timestamp", ts) - .put("dim1", dim1) - .put("dim2", dim2) - .put("dimLong", dimLong) - .put("dimFloat", dimFloat) - .put("met1", met1) - .build() - ))); + .put("timestamp", ts) + .put("dim1", dim1) + .put("dim2", dim2) + .put("dimLong", dimLong) + .put("dimFloat", dimFloat) + .put("met1", met1) + .build() + ))))); } catch (Exception e) { throw new RuntimeException(e); diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java index 9001f148e99..e6ed27c9cec 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java @@ -27,12 +27,12 @@ import com.google.common.collect.ImmutableSet; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import org.apache.druid.data.input.InputFormat; -import org.apache.druid.data.input.impl.ByteEntity; import org.apache.druid.data.input.impl.DimensionSchema; import org.apache.druid.data.input.impl.DimensionsSpec; import org.apache.druid.data.input.impl.JsonInputFormat; import org.apache.druid.data.input.impl.StringDimensionSchema; import org.apache.druid.data.input.impl.TimestampSpec; +import org.apache.druid.data.input.kinesis.KinesisRecordEntity; import org.apache.druid.indexer.TaskLocation; import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexing.common.TaskInfoProvider; @@ -5656,7 +5656,7 @@ public class KinesisSupervisorTest extends EasyMockSupport } @Override - protected RecordSupplier setupRecordSupplier() + protected RecordSupplier setupRecordSupplier() { return supervisorRecordSupplier; }