mirror of https://github.com/apache/druid.git
Data loader (sampler component) - Kafka/Kinesis samplers (#7566)
* implement Kafka/Kinesis sampler * add KafkaSamplerSpecTest and KinesisSamplerSpecTest * code review changes
This commit is contained in:
parent
ec0b7787cf
commit
d38457933f
|
@ -48,7 +48,8 @@ public class KafkaIndexTaskModule implements DruidModule
|
|||
// (Older versions of Druid didn't specify a type name and got this one by default.)
|
||||
new NamedType(KafkaIndexTaskTuningConfig.class, "KafkaTuningConfig"),
|
||||
new NamedType(KafkaSupervisorTuningConfig.class, "kafka"),
|
||||
new NamedType(KafkaSupervisorSpec.class, "kafka")
|
||||
new NamedType(KafkaSupervisorSpec.class, "kafka"),
|
||||
new NamedType(KafkaSamplerSpec.class, "kafka")
|
||||
)
|
||||
);
|
||||
}
|
||||
|
|
|
@ -0,0 +1,91 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.druid.indexing.kafka;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JacksonInject;
|
||||
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import org.apache.druid.data.input.Firehose;
|
||||
import org.apache.druid.data.input.impl.InputRowParser;
|
||||
import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorIOConfig;
|
||||
import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorSpec;
|
||||
import org.apache.druid.indexing.overlord.sampler.FirehoseSampler;
|
||||
import org.apache.druid.indexing.overlord.sampler.SamplerConfig;
|
||||
import org.apache.druid.indexing.seekablestream.SeekableStreamSamplerSpec;
|
||||
import org.apache.druid.indexing.seekablestream.common.RecordSupplier;
|
||||
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
public class KafkaSamplerSpec extends SeekableStreamSamplerSpec
|
||||
{
|
||||
private final ObjectMapper objectMapper;
|
||||
|
||||
@JsonCreator
|
||||
public KafkaSamplerSpec(
|
||||
@JsonProperty("spec") final KafkaSupervisorSpec ingestionSpec,
|
||||
@JsonProperty("samplerConfig") final SamplerConfig samplerConfig,
|
||||
@JacksonInject FirehoseSampler firehoseSampler,
|
||||
@JacksonInject ObjectMapper objectMapper
|
||||
)
|
||||
{
|
||||
super(ingestionSpec, samplerConfig, firehoseSampler);
|
||||
|
||||
this.objectMapper = objectMapper;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Firehose getFirehose(InputRowParser parser)
|
||||
{
|
||||
return new KafkaSamplerFirehose(parser);
|
||||
}
|
||||
|
||||
protected class KafkaSamplerFirehose extends SeekableStreamSamplerFirehose
|
||||
{
|
||||
private KafkaSamplerFirehose(InputRowParser parser)
|
||||
{
|
||||
super(parser);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected RecordSupplier getRecordSupplier()
|
||||
{
|
||||
ClassLoader currCtxCl = Thread.currentThread().getContextClassLoader();
|
||||
try {
|
||||
Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
|
||||
|
||||
final Map<String, Object> props = new HashMap<>(((KafkaSupervisorIOConfig) ioConfig).getConsumerProperties());
|
||||
|
||||
props.put("enable.auto.commit", "false");
|
||||
props.put("auto.offset.reset", "none");
|
||||
props.put("key.deserializer", ByteArrayDeserializer.class.getName());
|
||||
props.put("value.deserializer", ByteArrayDeserializer.class.getName());
|
||||
props.put("request.timeout.ms", Integer.toString(samplerConfig.getTimeoutMs()));
|
||||
|
||||
return new KafkaRecordSupplier(props, objectMapper);
|
||||
}
|
||||
finally {
|
||||
Thread.currentThread().setContextClassLoader(currCtxCl);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,270 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.druid.indexing.kafka;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import org.apache.curator.test.TestingCluster;
|
||||
import org.apache.druid.client.cache.MapCache;
|
||||
import org.apache.druid.data.input.impl.DimensionsSpec;
|
||||
import org.apache.druid.data.input.impl.FloatDimensionSchema;
|
||||
import org.apache.druid.data.input.impl.JSONParseSpec;
|
||||
import org.apache.druid.data.input.impl.LongDimensionSchema;
|
||||
import org.apache.druid.data.input.impl.StringDimensionSchema;
|
||||
import org.apache.druid.data.input.impl.StringInputRowParser;
|
||||
import org.apache.druid.data.input.impl.TimestampSpec;
|
||||
import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorIOConfig;
|
||||
import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorSpec;
|
||||
import org.apache.druid.indexing.kafka.test.TestBroker;
|
||||
import org.apache.druid.indexing.overlord.sampler.FirehoseSampler;
|
||||
import org.apache.druid.indexing.overlord.sampler.SamplerCache;
|
||||
import org.apache.druid.indexing.overlord.sampler.SamplerConfig;
|
||||
import org.apache.druid.indexing.overlord.sampler.SamplerResponse;
|
||||
import org.apache.druid.java.util.common.StringUtils;
|
||||
import org.apache.druid.java.util.common.granularity.Granularities;
|
||||
import org.apache.druid.java.util.common.parsers.JSONPathSpec;
|
||||
import org.apache.druid.query.aggregation.AggregatorFactory;
|
||||
import org.apache.druid.query.aggregation.CountAggregatorFactory;
|
||||
import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory;
|
||||
import org.apache.druid.segment.TestHelper;
|
||||
import org.apache.druid.segment.indexing.DataSchema;
|
||||
import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
|
||||
import org.apache.kafka.clients.producer.KafkaProducer;
|
||||
import org.apache.kafka.clients.producer.ProducerRecord;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Assert;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.Arrays;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
public class KafkaSamplerSpecTest
|
||||
{
|
||||
private static final ObjectMapper objectMapper = TestHelper.makeJsonMapper();
|
||||
private static final String TOPIC = "sampling";
|
||||
private static final DataSchema DATA_SCHEMA = new DataSchema(
|
||||
"test_ds",
|
||||
objectMapper.convertValue(
|
||||
new StringInputRowParser(
|
||||
new JSONParseSpec(
|
||||
new TimestampSpec("timestamp", "iso", null),
|
||||
new DimensionsSpec(
|
||||
Arrays.asList(
|
||||
new StringDimensionSchema("dim1"),
|
||||
new StringDimensionSchema("dim1t"),
|
||||
new StringDimensionSchema("dim2"),
|
||||
new LongDimensionSchema("dimLong"),
|
||||
new FloatDimensionSchema("dimFloat")
|
||||
),
|
||||
null,
|
||||
null
|
||||
),
|
||||
new JSONPathSpec(true, ImmutableList.of()),
|
||||
ImmutableMap.of()
|
||||
),
|
||||
StandardCharsets.UTF_8.name()
|
||||
),
|
||||
Map.class
|
||||
),
|
||||
new AggregatorFactory[]{
|
||||
new DoubleSumAggregatorFactory("met1sum", "met1"),
|
||||
new CountAggregatorFactory("rows")
|
||||
},
|
||||
new UniformGranularitySpec(Granularities.DAY, Granularities.NONE, null),
|
||||
null,
|
||||
objectMapper
|
||||
);
|
||||
|
||||
private static TestingCluster zkServer;
|
||||
private static TestBroker kafkaServer;
|
||||
|
||||
private static List<ProducerRecord<byte[], byte[]>> generateRecords(String topic)
|
||||
{
|
||||
return ImmutableList.of(
|
||||
new ProducerRecord<>(topic, 0, null, jb("2008", "a", "y", "10", "20.0", "1.0")),
|
||||
new ProducerRecord<>(topic, 0, null, jb("2009", "b", "y", "10", "20.0", "1.0")),
|
||||
new ProducerRecord<>(topic, 0, null, jb("2010", "c", "y", "10", "20.0", "1.0")),
|
||||
new ProducerRecord<>(topic, 0, null, jb("246140482-04-24T15:36:27.903Z", "x", "z", "10", "20.0", "1.0")),
|
||||
new ProducerRecord<>(topic, 0, null, StringUtils.toUtf8("unparseable")),
|
||||
new ProducerRecord<>(topic, 0, null, null)
|
||||
);
|
||||
}
|
||||
|
||||
@BeforeClass
|
||||
public static void setupClass() throws Exception
|
||||
{
|
||||
zkServer = new TestingCluster(1);
|
||||
zkServer.start();
|
||||
|
||||
kafkaServer = new TestBroker(zkServer.getConnectString(), null, 1, ImmutableMap.of("num.partitions", "2"));
|
||||
kafkaServer.start();
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void tearDownClass() throws Exception
|
||||
{
|
||||
kafkaServer.close();
|
||||
zkServer.stop();
|
||||
}
|
||||
|
||||
@Test(timeout = 30_000L)
|
||||
public void testSample()
|
||||
{
|
||||
insertData(generateRecords(TOPIC));
|
||||
|
||||
KafkaSupervisorSpec supervisorSpec = new KafkaSupervisorSpec(
|
||||
DATA_SCHEMA,
|
||||
null,
|
||||
new KafkaSupervisorIOConfig(
|
||||
TOPIC,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
kafkaServer.consumerProperties(),
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
true,
|
||||
null,
|
||||
null,
|
||||
null
|
||||
),
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null
|
||||
);
|
||||
|
||||
KafkaSamplerSpec samplerSpec = new KafkaSamplerSpec(
|
||||
supervisorSpec,
|
||||
new SamplerConfig(5, null, null, null),
|
||||
new FirehoseSampler(objectMapper, new SamplerCache(MapCache.create(100000))),
|
||||
objectMapper
|
||||
);
|
||||
|
||||
SamplerResponse response = samplerSpec.sample();
|
||||
|
||||
Assert.assertNotNull(response.getCacheKey());
|
||||
Assert.assertEquals(5, (int) response.getNumRowsRead());
|
||||
Assert.assertEquals(3, (int) response.getNumRowsIndexed());
|
||||
Assert.assertEquals(5, response.getData().size());
|
||||
|
||||
Iterator<SamplerResponse.SamplerResponseRow> it = response.getData().iterator();
|
||||
|
||||
Assert.assertEquals(new SamplerResponse.SamplerResponseRow(
|
||||
"{\"timestamp\":\"2008\",\"dim1\":\"a\",\"dim2\":\"y\",\"dimLong\":\"10\",\"dimFloat\":\"20.0\",\"met1\":\"1.0\"}",
|
||||
ImmutableMap.<String, Object>builder()
|
||||
.put("__time", 1199145600000L)
|
||||
.put("dim1", "a")
|
||||
.put("dim2", "y")
|
||||
.put("dimLong", 10L)
|
||||
.put("dimFloat", 20.0F)
|
||||
.put("rows", 1L)
|
||||
.put("met1sum", 1.0)
|
||||
.build(),
|
||||
null,
|
||||
null
|
||||
), it.next());
|
||||
Assert.assertEquals(new SamplerResponse.SamplerResponseRow(
|
||||
"{\"timestamp\":\"2009\",\"dim1\":\"b\",\"dim2\":\"y\",\"dimLong\":\"10\",\"dimFloat\":\"20.0\",\"met1\":\"1.0\"}",
|
||||
ImmutableMap.<String, Object>builder()
|
||||
.put("__time", 1230768000000L)
|
||||
.put("dim1", "b")
|
||||
.put("dim2", "y")
|
||||
.put("dimLong", 10L)
|
||||
.put("dimFloat", 20.0F)
|
||||
.put("rows", 1L)
|
||||
.put("met1sum", 1.0)
|
||||
.build(),
|
||||
null,
|
||||
null
|
||||
), it.next());
|
||||
Assert.assertEquals(new SamplerResponse.SamplerResponseRow(
|
||||
"{\"timestamp\":\"2010\",\"dim1\":\"c\",\"dim2\":\"y\",\"dimLong\":\"10\",\"dimFloat\":\"20.0\",\"met1\":\"1.0\"}",
|
||||
ImmutableMap.<String, Object>builder()
|
||||
.put("__time", 1262304000000L)
|
||||
.put("dim1", "c")
|
||||
.put("dim2", "y")
|
||||
.put("dimLong", 10L)
|
||||
.put("dimFloat", 20.0F)
|
||||
.put("rows", 1L)
|
||||
.put("met1sum", 1.0)
|
||||
.build(),
|
||||
null,
|
||||
null
|
||||
), it.next());
|
||||
Assert.assertEquals(new SamplerResponse.SamplerResponseRow(
|
||||
"{\"timestamp\":\"246140482-04-24T15:36:27.903Z\",\"dim1\":\"x\",\"dim2\":\"z\",\"dimLong\":\"10\",\"dimFloat\":\"20.0\",\"met1\":\"1.0\"}",
|
||||
null,
|
||||
true,
|
||||
"Timestamp cannot be represented as a long: [MapBasedInputRow{timestamp=246140482-04-24T15:36:27.903Z, event={timestamp=246140482-04-24T15:36:27.903Z, dim1=x, dim2=z, dimLong=10, dimFloat=20.0, met1=1.0}, dimensions=[dim1, dim1t, dim2, dimLong, dimFloat]}]"
|
||||
), it.next());
|
||||
Assert.assertEquals(new SamplerResponse.SamplerResponseRow(
|
||||
"unparseable",
|
||||
null,
|
||||
true,
|
||||
"Unable to parse row [unparseable]"
|
||||
), it.next());
|
||||
|
||||
Assert.assertFalse(it.hasNext());
|
||||
}
|
||||
|
||||
private static void insertData(List<ProducerRecord<byte[], byte[]>> data)
|
||||
{
|
||||
try (final KafkaProducer<byte[], byte[]> kafkaProducer = kafkaServer.newProducer()) {
|
||||
kafkaProducer.initTransactions();
|
||||
kafkaProducer.beginTransaction();
|
||||
|
||||
data.forEach(kafkaProducer::send);
|
||||
|
||||
kafkaProducer.commitTransaction();
|
||||
}
|
||||
}
|
||||
|
||||
private static byte[] jb(String timestamp, String dim1, String dim2, String dimLong, String dimFloat, String met1)
|
||||
{
|
||||
try {
|
||||
return new ObjectMapper().writeValueAsBytes(
|
||||
ImmutableMap.builder()
|
||||
.put("timestamp", timestamp)
|
||||
.put("dim1", dim1)
|
||||
.put("dim2", dim2)
|
||||
.put("dimLong", dimLong)
|
||||
.put("dimFloat", dimFloat)
|
||||
.put("met1", met1)
|
||||
.build()
|
||||
);
|
||||
}
|
||||
catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -48,7 +48,8 @@ public class KinesisIndexingServiceModule implements DruidModule
|
|||
new NamedType(KinesisDataSourceMetadata.class, "kinesis"),
|
||||
new NamedType(KinesisIndexTaskIOConfig.class, "kinesis"),
|
||||
new NamedType(KinesisSupervisorTuningConfig.class, "kinesis"),
|
||||
new NamedType(KinesisSupervisorSpec.class, "kinesis")
|
||||
new NamedType(KinesisSupervisorSpec.class, "kinesis"),
|
||||
new NamedType(KinesisSamplerSpec.class, "kinesis")
|
||||
)
|
||||
);
|
||||
}
|
||||
|
|
|
@ -0,0 +1,92 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.druid.indexing.kinesis;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JacksonInject;
|
||||
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import com.google.inject.name.Named;
|
||||
import org.apache.druid.common.aws.AWSCredentialsConfig;
|
||||
import org.apache.druid.data.input.Firehose;
|
||||
import org.apache.druid.data.input.impl.InputRowParser;
|
||||
import org.apache.druid.indexing.kinesis.supervisor.KinesisSupervisorIOConfig;
|
||||
import org.apache.druid.indexing.kinesis.supervisor.KinesisSupervisorSpec;
|
||||
import org.apache.druid.indexing.kinesis.supervisor.KinesisSupervisorTuningConfig;
|
||||
import org.apache.druid.indexing.overlord.sampler.FirehoseSampler;
|
||||
import org.apache.druid.indexing.overlord.sampler.SamplerConfig;
|
||||
import org.apache.druid.indexing.seekablestream.SeekableStreamSamplerSpec;
|
||||
import org.apache.druid.indexing.seekablestream.common.RecordSupplier;
|
||||
|
||||
public class KinesisSamplerSpec extends SeekableStreamSamplerSpec
|
||||
{
|
||||
private final AWSCredentialsConfig awsCredentialsConfig;
|
||||
|
||||
@JsonCreator
|
||||
public KinesisSamplerSpec(
|
||||
@JsonProperty("spec") final KinesisSupervisorSpec ingestionSpec,
|
||||
@JsonProperty("samplerConfig") final SamplerConfig samplerConfig,
|
||||
@JacksonInject FirehoseSampler firehoseSampler,
|
||||
@JacksonInject @Named("kinesis") AWSCredentialsConfig awsCredentialsConfig
|
||||
)
|
||||
{
|
||||
super(ingestionSpec, samplerConfig, firehoseSampler);
|
||||
|
||||
this.awsCredentialsConfig = awsCredentialsConfig;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Firehose getFirehose(InputRowParser parser)
|
||||
{
|
||||
return new KinesisSamplerFirehose(parser);
|
||||
}
|
||||
|
||||
protected class KinesisSamplerFirehose extends SeekableStreamSamplerFirehose
|
||||
{
|
||||
protected KinesisSamplerFirehose(InputRowParser parser)
|
||||
{
|
||||
super(parser);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected RecordSupplier getRecordSupplier()
|
||||
{
|
||||
KinesisSupervisorIOConfig ioConfig = (KinesisSupervisorIOConfig) KinesisSamplerSpec.this.ioConfig;
|
||||
KinesisSupervisorTuningConfig tuningConfig = ((KinesisSupervisorTuningConfig) KinesisSamplerSpec.this.tuningConfig);
|
||||
|
||||
return new KinesisRecordSupplier(
|
||||
KinesisRecordSupplier.getAmazonKinesisClient(
|
||||
ioConfig.getEndpoint(),
|
||||
awsCredentialsConfig,
|
||||
ioConfig.getAwsAssumedRoleArn(),
|
||||
ioConfig.getAwsExternalId()
|
||||
),
|
||||
ioConfig.getRecordsPerFetch(),
|
||||
ioConfig.getFetchDelayMillis(),
|
||||
1,
|
||||
ioConfig.isDeaggregate(),
|
||||
tuningConfig.getRecordBufferSize(),
|
||||
tuningConfig.getRecordBufferOfferTimeout(),
|
||||
tuningConfig.getRecordBufferFullWait(),
|
||||
tuningConfig.getFetchSequenceNumberTimeout(),
|
||||
tuningConfig.getMaxRecordsPerPoll()
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,305 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.druid.indexing.kinesis;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.collect.ImmutableSet;
|
||||
import org.apache.druid.client.cache.MapCache;
|
||||
import org.apache.druid.common.aws.AWSCredentialsConfig;
|
||||
import org.apache.druid.data.input.Firehose;
|
||||
import org.apache.druid.data.input.impl.DimensionsSpec;
|
||||
import org.apache.druid.data.input.impl.FloatDimensionSchema;
|
||||
import org.apache.druid.data.input.impl.InputRowParser;
|
||||
import org.apache.druid.data.input.impl.JSONParseSpec;
|
||||
import org.apache.druid.data.input.impl.LongDimensionSchema;
|
||||
import org.apache.druid.data.input.impl.StringDimensionSchema;
|
||||
import org.apache.druid.data.input.impl.StringInputRowParser;
|
||||
import org.apache.druid.data.input.impl.TimestampSpec;
|
||||
import org.apache.druid.indexing.kinesis.supervisor.KinesisSupervisorIOConfig;
|
||||
import org.apache.druid.indexing.kinesis.supervisor.KinesisSupervisorSpec;
|
||||
import org.apache.druid.indexing.overlord.sampler.FirehoseSampler;
|
||||
import org.apache.druid.indexing.overlord.sampler.SamplerCache;
|
||||
import org.apache.druid.indexing.overlord.sampler.SamplerConfig;
|
||||
import org.apache.druid.indexing.overlord.sampler.SamplerResponse;
|
||||
import org.apache.druid.indexing.seekablestream.common.OrderedPartitionableRecord;
|
||||
import org.apache.druid.indexing.seekablestream.common.RecordSupplier;
|
||||
import org.apache.druid.indexing.seekablestream.common.StreamPartition;
|
||||
import org.apache.druid.java.util.common.StringUtils;
|
||||
import org.apache.druid.java.util.common.granularity.Granularities;
|
||||
import org.apache.druid.java.util.common.parsers.JSONPathSpec;
|
||||
import org.apache.druid.query.aggregation.AggregatorFactory;
|
||||
import org.apache.druid.query.aggregation.CountAggregatorFactory;
|
||||
import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory;
|
||||
import org.apache.druid.segment.TestHelper;
|
||||
import org.apache.druid.segment.indexing.DataSchema;
|
||||
import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
|
||||
import org.easymock.EasyMockSupport;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import static org.easymock.EasyMock.anyLong;
|
||||
import static org.easymock.EasyMock.expect;
|
||||
import static org.easymock.EasyMock.expectLastCall;
|
||||
|
||||
public class KinesisSamplerSpecTest extends EasyMockSupport
|
||||
{
|
||||
private static final ObjectMapper objectMapper = TestHelper.makeJsonMapper();
|
||||
private static final String STREAM = "sampling";
|
||||
private static final String SHARD_ID = "1";
|
||||
private static final DataSchema DATA_SCHEMA = new DataSchema(
|
||||
"test_ds",
|
||||
objectMapper.convertValue(
|
||||
new StringInputRowParser(
|
||||
new JSONParseSpec(
|
||||
new TimestampSpec("timestamp", "iso", null),
|
||||
new DimensionsSpec(
|
||||
Arrays.asList(
|
||||
new StringDimensionSchema("dim1"),
|
||||
new StringDimensionSchema("dim1t"),
|
||||
new StringDimensionSchema("dim2"),
|
||||
new LongDimensionSchema("dimLong"),
|
||||
new FloatDimensionSchema("dimFloat")
|
||||
),
|
||||
null,
|
||||
null
|
||||
),
|
||||
new JSONPathSpec(true, ImmutableList.of()),
|
||||
ImmutableMap.of()
|
||||
),
|
||||
StandardCharsets.UTF_8.name()
|
||||
),
|
||||
Map.class
|
||||
),
|
||||
new AggregatorFactory[]{
|
||||
new DoubleSumAggregatorFactory("met1sum", "met1"),
|
||||
new CountAggregatorFactory("rows")
|
||||
},
|
||||
new UniformGranularitySpec(Granularities.DAY, Granularities.NONE, null),
|
||||
null,
|
||||
objectMapper
|
||||
);
|
||||
|
||||
private final KinesisRecordSupplier recordSupplier = mock(KinesisRecordSupplier.class);
|
||||
|
||||
private static List<OrderedPartitionableRecord<String, String>> generateRecords(String stream)
|
||||
{
|
||||
return ImmutableList.of(
|
||||
new OrderedPartitionableRecord<>(stream, "1", "0", jb("2008", "a", "y", "10", "20.0", "1.0")),
|
||||
new OrderedPartitionableRecord<>(stream, "1", "1", jb("2009", "b", "y", "10", "20.0", "1.0")),
|
||||
new OrderedPartitionableRecord<>(stream, "1", "2", jb("2010", "c", "y", "10", "20.0", "1.0")),
|
||||
new OrderedPartitionableRecord<>(
|
||||
stream,
|
||||
"1",
|
||||
"5",
|
||||
jb("246140482-04-24T15:36:27.903Z", "x", "z", "10", "20.0", "1.0")
|
||||
),
|
||||
new OrderedPartitionableRecord<>(
|
||||
stream,
|
||||
"1",
|
||||
"6",
|
||||
Collections.singletonList(StringUtils.toUtf8("unparseable"))
|
||||
),
|
||||
new OrderedPartitionableRecord<>(stream, "1", "8", Collections.singletonList(StringUtils.toUtf8("{}")))
|
||||
);
|
||||
}
|
||||
|
||||
@Test(timeout = 10_000L)
|
||||
public void testSample() throws Exception
|
||||
{
|
||||
expect(recordSupplier.getPartitionIds(STREAM)).andReturn(ImmutableSet.of(SHARD_ID)).once();
|
||||
|
||||
recordSupplier.assign(ImmutableSet.of(StreamPartition.of(STREAM, SHARD_ID)));
|
||||
expectLastCall().once();
|
||||
|
||||
recordSupplier.seekToEarliest(ImmutableSet.of(StreamPartition.of(STREAM, SHARD_ID)));
|
||||
expectLastCall().once();
|
||||
|
||||
expect(recordSupplier.poll(anyLong())).andReturn(generateRecords(STREAM)).once();
|
||||
|
||||
recordSupplier.close();
|
||||
expectLastCall().once();
|
||||
|
||||
replayAll();
|
||||
|
||||
KinesisSupervisorSpec supervisorSpec = new KinesisSupervisorSpec(
|
||||
DATA_SCHEMA,
|
||||
null,
|
||||
new KinesisSupervisorIOConfig(
|
||||
STREAM,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
true,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
false
|
||||
),
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null
|
||||
);
|
||||
|
||||
KinesisSamplerSpec samplerSpec = new TestableKinesisSamplerSpec(
|
||||
supervisorSpec,
|
||||
new SamplerConfig(5, null, null, null),
|
||||
new FirehoseSampler(objectMapper, new SamplerCache(MapCache.create(100000))),
|
||||
null
|
||||
);
|
||||
|
||||
SamplerResponse response = samplerSpec.sample();
|
||||
|
||||
verifyAll();
|
||||
|
||||
Assert.assertNotNull(response.getCacheKey());
|
||||
Assert.assertEquals(5, (int) response.getNumRowsRead());
|
||||
Assert.assertEquals(3, (int) response.getNumRowsIndexed());
|
||||
Assert.assertEquals(5, response.getData().size());
|
||||
|
||||
Iterator<SamplerResponse.SamplerResponseRow> it = response.getData().iterator();
|
||||
|
||||
Assert.assertEquals(new SamplerResponse.SamplerResponseRow(
|
||||
"{\"timestamp\":\"2008\",\"dim1\":\"a\",\"dim2\":\"y\",\"dimLong\":\"10\",\"dimFloat\":\"20.0\",\"met1\":\"1.0\"}",
|
||||
ImmutableMap.<String, Object>builder()
|
||||
.put("__time", 1199145600000L)
|
||||
.put("dim1", "a")
|
||||
.put("dim2", "y")
|
||||
.put("dimLong", 10L)
|
||||
.put("dimFloat", 20.0F)
|
||||
.put("rows", 1L)
|
||||
.put("met1sum", 1.0)
|
||||
.build(),
|
||||
null,
|
||||
null
|
||||
), it.next());
|
||||
Assert.assertEquals(new SamplerResponse.SamplerResponseRow(
|
||||
"{\"timestamp\":\"2009\",\"dim1\":\"b\",\"dim2\":\"y\",\"dimLong\":\"10\",\"dimFloat\":\"20.0\",\"met1\":\"1.0\"}",
|
||||
ImmutableMap.<String, Object>builder()
|
||||
.put("__time", 1230768000000L)
|
||||
.put("dim1", "b")
|
||||
.put("dim2", "y")
|
||||
.put("dimLong", 10L)
|
||||
.put("dimFloat", 20.0F)
|
||||
.put("rows", 1L)
|
||||
.put("met1sum", 1.0)
|
||||
.build(),
|
||||
null,
|
||||
null
|
||||
), it.next());
|
||||
Assert.assertEquals(new SamplerResponse.SamplerResponseRow(
|
||||
"{\"timestamp\":\"2010\",\"dim1\":\"c\",\"dim2\":\"y\",\"dimLong\":\"10\",\"dimFloat\":\"20.0\",\"met1\":\"1.0\"}",
|
||||
ImmutableMap.<String, Object>builder()
|
||||
.put("__time", 1262304000000L)
|
||||
.put("dim1", "c")
|
||||
.put("dim2", "y")
|
||||
.put("dimLong", 10L)
|
||||
.put("dimFloat", 20.0F)
|
||||
.put("rows", 1L)
|
||||
.put("met1sum", 1.0)
|
||||
.build(),
|
||||
null,
|
||||
null
|
||||
), it.next());
|
||||
Assert.assertEquals(new SamplerResponse.SamplerResponseRow(
|
||||
"{\"timestamp\":\"246140482-04-24T15:36:27.903Z\",\"dim1\":\"x\",\"dim2\":\"z\",\"dimLong\":\"10\",\"dimFloat\":\"20.0\",\"met1\":\"1.0\"}",
|
||||
null,
|
||||
true,
|
||||
"Timestamp cannot be represented as a long: [MapBasedInputRow{timestamp=246140482-04-24T15:36:27.903Z, event={timestamp=246140482-04-24T15:36:27.903Z, dim1=x, dim2=z, dimLong=10, dimFloat=20.0, met1=1.0}, dimensions=[dim1, dim1t, dim2, dimLong, dimFloat]}]"
|
||||
), it.next());
|
||||
Assert.assertEquals(new SamplerResponse.SamplerResponseRow(
|
||||
"unparseable",
|
||||
null,
|
||||
true,
|
||||
"Unable to parse row [unparseable]"
|
||||
), it.next());
|
||||
|
||||
Assert.assertFalse(it.hasNext());
|
||||
}
|
||||
|
||||
private static List<byte[]> jb(String ts, String dim1, String dim2, String dimLong, String dimFloat, String met1)
|
||||
{
|
||||
try {
|
||||
return Collections.singletonList(new ObjectMapper().writeValueAsBytes(
|
||||
ImmutableMap.builder()
|
||||
.put("timestamp", ts)
|
||||
.put("dim1", dim1)
|
||||
.put("dim2", dim2)
|
||||
.put("dimLong", dimLong)
|
||||
.put("dimFloat", dimFloat)
|
||||
.put("met1", met1)
|
||||
.build()
|
||||
));
|
||||
}
|
||||
catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
private class TestableKinesisSamplerSpec extends KinesisSamplerSpec
|
||||
{
|
||||
private TestableKinesisSamplerSpec(
|
||||
KinesisSupervisorSpec ingestionSpec,
|
||||
SamplerConfig samplerConfig,
|
||||
FirehoseSampler firehoseSampler,
|
||||
AWSCredentialsConfig awsCredentialsConfig
|
||||
)
|
||||
{
|
||||
super(ingestionSpec, samplerConfig, firehoseSampler, awsCredentialsConfig);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Firehose getFirehose(InputRowParser parser)
|
||||
{
|
||||
return new KinesisSamplerFirehose(parser)
|
||||
{
|
||||
@Override
|
||||
protected RecordSupplier getRecordSupplier()
|
||||
{
|
||||
return recordSupplier;
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,207 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.druid.indexing.seekablestream;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import org.apache.druid.data.input.Firehose;
|
||||
import org.apache.druid.data.input.FirehoseFactory;
|
||||
import org.apache.druid.data.input.InputRow;
|
||||
import org.apache.druid.data.input.InputRowPlusRaw;
|
||||
import org.apache.druid.data.input.impl.InputRowParser;
|
||||
import org.apache.druid.data.input.impl.StringInputRowParser;
|
||||
import org.apache.druid.indexing.overlord.sampler.FirehoseSampler;
|
||||
import org.apache.druid.indexing.overlord.sampler.SamplerConfig;
|
||||
import org.apache.druid.indexing.overlord.sampler.SamplerException;
|
||||
import org.apache.druid.indexing.overlord.sampler.SamplerResponse;
|
||||
import org.apache.druid.indexing.overlord.sampler.SamplerSpec;
|
||||
import org.apache.druid.indexing.seekablestream.common.OrderedPartitionableRecord;
|
||||
import org.apache.druid.indexing.seekablestream.common.RecordSupplier;
|
||||
import org.apache.druid.indexing.seekablestream.common.StreamPartition;
|
||||
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorIOConfig;
|
||||
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorSpec;
|
||||
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorTuningConfig;
|
||||
import org.apache.druid.java.util.common.parsers.ParseException;
|
||||
import org.apache.druid.segment.indexing.DataSchema;
|
||||
import org.apache.druid.utils.Runnables;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
import java.io.File;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
public abstract class SeekableStreamSamplerSpec<PartitionIdType, SequenceOffsetType> implements SamplerSpec
|
||||
{
|
||||
private static final int POLL_TIMEOUT_MS = 100;
|
||||
|
||||
private final DataSchema dataSchema;
|
||||
private final FirehoseSampler firehoseSampler;
|
||||
|
||||
protected final SeekableStreamSupervisorIOConfig ioConfig;
|
||||
protected final SeekableStreamSupervisorTuningConfig tuningConfig;
|
||||
protected final SamplerConfig samplerConfig;
|
||||
|
||||
public SeekableStreamSamplerSpec(
|
||||
final SeekableStreamSupervisorSpec ingestionSpec,
|
||||
final SamplerConfig samplerConfig,
|
||||
final FirehoseSampler firehoseSampler
|
||||
)
|
||||
{
|
||||
this.dataSchema = Preconditions.checkNotNull(ingestionSpec, "[spec] is required").getDataSchema();
|
||||
this.ioConfig = Preconditions.checkNotNull(ingestionSpec.getIoConfig(), "[spec.ioConfig] is required");
|
||||
this.tuningConfig = ingestionSpec.getTuningConfig();
|
||||
this.samplerConfig = samplerConfig;
|
||||
this.firehoseSampler = firehoseSampler;
|
||||
}
|
||||
|
||||
@Override
|
||||
public SamplerResponse sample()
|
||||
{
|
||||
return firehoseSampler.sample(
|
||||
new FirehoseFactory()
|
||||
{
|
||||
@Override
|
||||
public Firehose connect(InputRowParser parser, @Nullable File temporaryDirectory)
|
||||
{
|
||||
return getFirehose(parser);
|
||||
}
|
||||
},
|
||||
dataSchema,
|
||||
samplerConfig
|
||||
);
|
||||
}
|
||||
|
||||
protected abstract Firehose getFirehose(InputRowParser parser);
|
||||
|
||||
protected abstract class SeekableStreamSamplerFirehose implements Firehose
|
||||
{
|
||||
private final InputRowParser parser;
|
||||
private final RecordSupplier<PartitionIdType, SequenceOffsetType> recordSupplier;
|
||||
|
||||
private Iterator<OrderedPartitionableRecord<PartitionIdType, SequenceOffsetType>> recordIterator;
|
||||
private Iterator<byte[]> recordDataIterator;
|
||||
|
||||
private volatile boolean closed = false;
|
||||
|
||||
protected SeekableStreamSamplerFirehose(InputRowParser parser)
|
||||
{
|
||||
this.parser = parser;
|
||||
|
||||
if (parser instanceof StringInputRowParser) {
|
||||
((StringInputRowParser) parser).startFileFromBeginning();
|
||||
}
|
||||
|
||||
this.recordSupplier = getRecordSupplier();
|
||||
|
||||
try {
|
||||
assignAndSeek();
|
||||
}
|
||||
catch (InterruptedException e) {
|
||||
throw new SamplerException(e, "Exception while seeking to partitions");
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean hasMore()
|
||||
{
|
||||
return !closed;
|
||||
}
|
||||
|
||||
@Nullable
|
||||
@Override
|
||||
public InputRow nextRow()
|
||||
{
|
||||
InputRowPlusRaw row = nextRowWithRaw();
|
||||
if (row.getParseException() != null) {
|
||||
throw row.getParseException();
|
||||
}
|
||||
|
||||
return row.getInputRow();
|
||||
}
|
||||
|
||||
@Override
|
||||
public InputRowPlusRaw nextRowWithRaw()
|
||||
{
|
||||
if (recordDataIterator == null || !recordDataIterator.hasNext()) {
|
||||
if (recordIterator == null || !recordIterator.hasNext()) {
|
||||
recordIterator = recordSupplier.poll(POLL_TIMEOUT_MS).iterator();
|
||||
|
||||
if (!recordIterator.hasNext()) {
|
||||
return InputRowPlusRaw.of((InputRow) null, null);
|
||||
}
|
||||
}
|
||||
|
||||
recordDataIterator = recordIterator.next().getData().iterator();
|
||||
|
||||
if (!recordDataIterator.hasNext()) {
|
||||
return InputRowPlusRaw.of((InputRow) null, null);
|
||||
}
|
||||
}
|
||||
|
||||
byte[] raw = recordDataIterator.next();
|
||||
|
||||
try {
|
||||
List<InputRow> rows = parser.parseBatch(ByteBuffer.wrap(raw));
|
||||
return InputRowPlusRaw.of(rows.isEmpty() ? null : rows.get(0), raw);
|
||||
}
|
||||
catch (ParseException e) {
|
||||
return InputRowPlusRaw.of(raw, e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Runnable commit()
|
||||
{
|
||||
return Runnables.getNoopRunnable();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close()
|
||||
{
|
||||
if (closed) {
|
||||
return;
|
||||
}
|
||||
|
||||
closed = true;
|
||||
recordSupplier.close();
|
||||
}
|
||||
|
||||
private void assignAndSeek() throws InterruptedException
|
||||
{
|
||||
final Set<StreamPartition<PartitionIdType>> partitions = recordSupplier
|
||||
.getPartitionIds(ioConfig.getStream())
|
||||
.stream()
|
||||
.map(x -> StreamPartition.of(ioConfig.getStream(), x))
|
||||
.collect(Collectors.toSet());
|
||||
|
||||
recordSupplier.assign(partitions);
|
||||
|
||||
if (ioConfig.isUseEarliestSequenceNumber()) {
|
||||
recordSupplier.seekToEarliest(partitions);
|
||||
} else {
|
||||
recordSupplier.seekToLatest(partitions);
|
||||
}
|
||||
}
|
||||
|
||||
protected abstract RecordSupplier<PartitionIdType, SequenceOffsetType> getRecordSupplier();
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue