add avro + kafka + schema registry integration test (#10929)

* add avro + schema registry integration test

* style

* retry init

* maybe this

* oops heh

* this will fix it

* review stuffs

* fix comment
This commit is contained in:
Clint Wylie 2021-03-08 08:12:12 -08:00 committed by GitHub
parent 9946306d4b
commit 96889cdebc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 436 additions and 72 deletions

View File

@ -26,14 +26,17 @@ import io.confluent.kafka.schemaregistry.ParsedSchema;
import io.confluent.kafka.schemaregistry.avro.AvroSchema; import io.confluent.kafka.schemaregistry.avro.AvroSchema;
import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient; import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
import org.apache.avro.Schema; import org.apache.avro.Schema;
import org.apache.avro.generic.GenericDatumReader; import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumReader; import org.apache.avro.io.DatumReader;
import org.apache.avro.io.DecoderFactory; import org.apache.avro.io.DecoderFactory;
import org.apache.druid.java.util.common.RE;
import org.apache.druid.java.util.common.parsers.ParseException; import org.apache.druid.java.util.common.parsers.ParseException;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -70,18 +73,32 @@ public class SchemaRegistryBasedAvroBytesDecoder implements AvroBytesDecoder
@Override @Override
public GenericRecord parse(ByteBuffer bytes) public GenericRecord parse(ByteBuffer bytes)
{ {
int length = bytes.limit() - 1 - 4;
if (length < 0) {
throw new ParseException("Failed to decode avro message, not enough bytes to decode (%s)", bytes.limit());
}
bytes.get(); // ignore first \0 byte
int id = bytes.getInt(); // extract schema registry id
int offset = bytes.position() + bytes.arrayOffset();
Schema schema;
try { try {
bytes.get(); // ignore first \0 byte
int id = bytes.getInt(); // extract schema registry id
int length = bytes.limit() - 1 - 4;
int offset = bytes.position() + bytes.arrayOffset();
ParsedSchema parsedSchema = registry.getSchemaById(id); ParsedSchema parsedSchema = registry.getSchemaById(id);
Schema schema = parsedSchema instanceof AvroSchema ? ((AvroSchema) parsedSchema).rawSchema() : null; schema = parsedSchema instanceof AvroSchema ? ((AvroSchema) parsedSchema).rawSchema() : null;
DatumReader<GenericRecord> reader = new GenericDatumReader<>(schema); }
catch (IOException | RestClientException ex) {
throw new RE(ex, "Failed to get Avro schema: %s", id);
}
if (schema == null) {
throw new RE("Failed to find Avro schema: %s", id);
}
DatumReader<GenericRecord> reader = new GenericDatumReader<>(schema);
try {
return reader.read(null, DecoderFactory.get().binaryDecoder(bytes.array(), offset, length, null)); return reader.read(null, DecoderFactory.get().binaryDecoder(bytes.array(), offset, length, null));
} }
catch (Exception e) { catch (Exception e) {
throw new ParseException(e, "Fail to decode avro message!"); throw new ParseException(e, "Fail to decode Avro message for schema: %s!", id);
} }
} }

View File

@ -20,6 +20,7 @@
package org.apache.druid.data.input.avro; package org.apache.druid.data.input.avro;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import io.confluent.kafka.schemaregistry.ParsedSchema;
import io.confluent.kafka.schemaregistry.avro.AvroSchema; import io.confluent.kafka.schemaregistry.avro.AvroSchema;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import org.apache.avro.Schema; import org.apache.avro.Schema;
@ -29,6 +30,7 @@ import org.apache.avro.io.EncoderFactory;
import org.apache.avro.specific.SpecificDatumWriter; import org.apache.avro.specific.SpecificDatumWriter;
import org.apache.druid.data.input.AvroStreamInputRowParserTest; import org.apache.druid.data.input.AvroStreamInputRowParserTest;
import org.apache.druid.data.input.SomeAvroDatum; import org.apache.druid.data.input.SomeAvroDatum;
import org.apache.druid.java.util.common.RE;
import org.apache.druid.java.util.common.parsers.ParseException; import org.apache.druid.java.util.common.parsers.ParseException;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
@ -96,40 +98,60 @@ public class SchemaRegistryBasedAvroBytesDecoderTest
public void testParse() throws Exception public void testParse() throws Exception
{ {
// Given // Given
Mockito.when(registry.getSchemaById(ArgumentMatchers.eq(1234))).thenReturn(new AvroSchema(SomeAvroDatum.getClassSchema())); Mockito.when(registry.getSchemaById(ArgumentMatchers.eq(1234)))
.thenReturn(new AvroSchema(SomeAvroDatum.getClassSchema()));
GenericRecord someAvroDatum = AvroStreamInputRowParserTest.buildSomeAvroDatum(); GenericRecord someAvroDatum = AvroStreamInputRowParserTest.buildSomeAvroDatum();
Schema schema = SomeAvroDatum.getClassSchema(); Schema schema = SomeAvroDatum.getClassSchema();
byte[] bytes = getAvroDatum(schema, someAvroDatum); byte[] bytes = getAvroDatum(schema, someAvroDatum);
ByteBuffer bb = ByteBuffer.allocate(bytes.length + 5).put((byte) 0).putInt(1234).put(bytes); ByteBuffer bb = ByteBuffer.allocate(bytes.length + 5).put((byte) 0).putInt(1234).put(bytes);
bb.rewind(); bb.rewind();
// When // When
GenericRecord actual = new SchemaRegistryBasedAvroBytesDecoder(registry).parse(bb); new SchemaRegistryBasedAvroBytesDecoder(registry).parse(bb);
// Then
Assert.assertEquals(someAvroDatum.get("id"), actual.get("id"));
} }
@Test(expected = ParseException.class) @Test(expected = ParseException.class)
public void testParseCorrupted() throws Exception public void testParseCorruptedNotEnoughBytesToEvenGetSchemaInfo()
{ {
// Given // Given
Mockito.when(registry.getSchemaById(ArgumentMatchers.eq(1234))).thenReturn(new AvroSchema(SomeAvroDatum.getClassSchema())); ByteBuffer bb = ByteBuffer.allocate(2).put((byte) 0).put(1, (byte) 1);
GenericRecord someAvroDatum = AvroStreamInputRowParserTest.buildSomeAvroDatum(); bb.rewind();
Schema schema = SomeAvroDatum.getClassSchema();
byte[] bytes = getAvroDatum(schema, someAvroDatum);
ByteBuffer bb = ByteBuffer.allocate(bytes.length + 5).put((byte) 0).putInt(1234).put((bytes), 5, 10);
// When // When
new SchemaRegistryBasedAvroBytesDecoder(registry).parse(bb); new SchemaRegistryBasedAvroBytesDecoder(registry).parse(bb);
} }
@Test(expected = ParseException.class) @Test(expected = ParseException.class)
public void testParseCorruptedPartial() throws Exception
{
// Given
Mockito.when(registry.getSchemaById(ArgumentMatchers.eq(1234)))
.thenReturn(new AvroSchema(SomeAvroDatum.getClassSchema()));
GenericRecord someAvroDatum = AvroStreamInputRowParserTest.buildSomeAvroDatum();
Schema schema = SomeAvroDatum.getClassSchema();
byte[] bytes = getAvroDatum(schema, someAvroDatum);
ByteBuffer bb = ByteBuffer.allocate(4 + 5).put((byte) 0).putInt(1234).put(bytes, 5, 4);
bb.rewind();
// When
new SchemaRegistryBasedAvroBytesDecoder(registry).parse(bb);
}
@Test(expected = RE.class)
public void testParseWrongSchemaType() throws Exception
{
// Given
Mockito.when(registry.getSchemaById(ArgumentMatchers.eq(1234))).thenReturn(Mockito.mock(ParsedSchema.class));
ByteBuffer bb = ByteBuffer.allocate(5).put((byte) 0).putInt(1234);
bb.rewind();
// When
new SchemaRegistryBasedAvroBytesDecoder(registry).parse(bb);
}
@Test(expected = RE.class)
public void testParseWrongId() throws Exception public void testParseWrongId() throws Exception
{ {
// Given // Given
Mockito.when(registry.getSchemaById(ArgumentMatchers.anyInt())).thenThrow(new IOException("no pasaran")); Mockito.when(registry.getSchemaById(ArgumentMatchers.anyInt())).thenThrow(new IOException("no pasaran"));
GenericRecord someAvroDatum = AvroStreamInputRowParserTest.buildSomeAvroDatum(); ByteBuffer bb = ByteBuffer.allocate(5).put((byte) 0).putInt(1234);
Schema schema = SomeAvroDatum.getClassSchema(); bb.rewind();
byte[] bytes = getAvroDatum(schema, someAvroDatum);
ByteBuffer bb = ByteBuffer.allocate(bytes.length + 5).put((byte) 0).putInt(1234).put(bytes);
// When // When
new SchemaRegistryBasedAvroBytesDecoder(registry).parse(bb); new SchemaRegistryBasedAvroBytesDecoder(registry).parse(bb);
} }

View File

@ -34,7 +34,7 @@ networks:
- subnet: 172.172.172.0/24 - subnet: 172.172.172.0/24
services: services:
### supporting infra: ### always there supporting infra:
druid-zookeeper-kafka: druid-zookeeper-kafka:
image: druid/cluster image: druid/cluster
container_name: druid-zookeeper-kafka container_name: druid-zookeeper-kafka
@ -71,45 +71,6 @@ services:
env_file: env_file:
- ./environment-configs/common - ./environment-configs/common
druid-it-hadoop:
image: druid-it/hadoop:2.8.5
container_name: druid-it-hadoop
ports:
- 2049:2049
- 2122:2122
- 8020:8020
- 8021:8021
- 8030:8030
- 8031:8031
- 8032:8032
- 8033:8033
- 8040:8040
- 8042:8042
- 8088:8088
- 8443:8443
- 9000:9000
- 10020:10020
- 19888:19888
- 34455:34455
- 50010:50010
- 50020:50020
- 50030:50030
- 50060:50060
- 50070:50070
- 50075:50075
- 50090:50090
- 51111:51111
networks:
druid-it-net:
ipv4_address: 172.172.172.101
privileged: true
volumes:
- ${HOME}/shared:/shared
- ./../src/test/resources:/resources
hostname: "druid-it-hadoop"
command: "bash -c 'echo Start druid-it-hadoop container... && \
/etc/bootstrap.sh && \
tail -f /dev/null'"
### overlords ### overlords
druid-overlord: druid-overlord:
@ -357,12 +318,54 @@ services:
- ./environment-configs/common - ./environment-configs/common
- ./environment-configs/router-custom-check-tls - ./environment-configs/router-custom-check-tls
### optional supporting infra
druid-it-hadoop:
image: druid-it/hadoop:2.8.5
container_name: druid-it-hadoop
ports:
- 2049:2049
- 2122:2122
- 8020:8020
- 8021:8021
- 8030:8030
- 8031:8031
- 8032:8032
- 8033:8033
- 8040:8040
- 8042:8042
- 8088:8088
- 8443:8443
- 9000:9000
- 10020:10020
- 19888:19888
- 34455:34455
- 50010:50010
- 50020:50020
- 50030:50030
- 50060:50060
- 50070:50070
- 50075:50075
- 50090:50090
- 51111:51111
networks:
druid-it-net:
ipv4_address: 172.172.172.101
privileged: true
volumes:
- ${HOME}/shared:/shared
- ./../src/test/resources:/resources
hostname: "druid-it-hadoop"
command: "bash -c 'echo Start druid-it-hadoop container... && \
/etc/bootstrap.sh && \
tail -f /dev/null'"
druid-openldap: druid-openldap:
image: osixia/openldap:1.4.0 image: osixia/openldap:1.4.0
container_name: druid-openldap container_name: druid-openldap
networks: networks:
druid-it-net: druid-it-net:
ipv4_address: 172.172.172.74 ipv4_address: 172.172.172.102
ports: ports:
- 8389:389 - 8389:389
- 8636:636 - 8636:636
@ -373,3 +376,26 @@ services:
env_file: env_file:
- ./environment-configs/common - ./environment-configs/common
command: --copy-service command: --copy-service
schema-registry:
image: confluentinc/cp-schema-registry:5.5.1
container_name: schema-registry
ports:
- 8085:8085
networks:
druid-it-net:
ipv4_address: 172.172.172.103
volumes:
- ${HOME}/shared:/shared
- ./schema-registry/jaas_config.file:/usr/lib/druid/conf/jaas_config.file
- ./schema-registry/password-file:/usr/lib/druid/conf/password-file
privileged: true
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_LISTENERS: "http://0.0.0.0:8085"
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: druid-zookeeper-kafka:9092
SCHEMA_REGISTRY_AUTHENTICATION_METHOD: BASIC
SCHEMA_REGISTRY_AUTHENTICATION_REALM: druid
SCHEMA_REGISTRY_AUTHENTICATION_ROLES: users
SCHEMA_REGISTRY_OPTS: -Djava.security.auth.login.config=/usr/lib/druid/conf/jaas_config.file

View File

@ -0,0 +1,29 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
version: "2.2"
services:
schema-registry:
extends:
file: docker-compose.base.yml
service: schema-registry
depends_on:
- druid-zookeeper-kafka
links:
- druid-zookeeper-kafka:druid-zookeeper-kafka
- druid-coordinator:druid-coordinator
- druid-broker:druid-broker
- druid-historical:druid-historical
- druid-indexer:druid-indexer

View File

@ -0,0 +1,29 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
version: "2.2"
services:
schema-registry:
extends:
file: docker-compose.base.yml
service: schema-registry
depends_on:
- druid-zookeeper-kafka
links:
- druid-zookeeper-kafka:druid-zookeeper-kafka
- druid-coordinator:druid-coordinator
- druid-broker:druid-broker
- druid-middlemanager:druid-middlemanager
- druid-historical:druid-historical

View File

@ -0,0 +1,5 @@
druid {
org.eclipse.jetty.jaas.spi.PropertyFileLoginModule required
file="/usr/lib/druid/conf/password-file"
debug="true";
};

View File

@ -0,0 +1 @@
druid: diurd,users

View File

@ -31,6 +31,13 @@
<version>0.22.0-SNAPSHOT</version> <version>0.22.0-SNAPSHOT</version>
</parent> </parent>
<repositories>
<repository>
<id>confluent</id>
<url>https://packages.confluent.io/maven/</url>
</repository>
</repositories>
<dependencies> <dependencies>
<dependency> <dependency>
<groupId>com.amazonaws</groupId> <groupId>com.amazonaws</groupId>
@ -320,6 +327,41 @@
<artifactId>guice-servlet</artifactId> <artifactId>guice-servlet</artifactId>
<version>${guice.version}</version> <version>${guice.version}</version>
</dependency> </dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-schema-registry-client</artifactId>
<version>5.5.1</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
</exclusion>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</exclusion>
<exclusion>
<groupId>javax.ws.rs</groupId>
<artifactId>javax.ws.rs-api</artifactId>
</exclusion>
<exclusion>
<groupId>javax.ws.rs</groupId>
<artifactId>javax.ws.rs-api</artifactId>
</exclusion>
<exclusion>
<groupId>javax.ws.rs</groupId>
<artifactId>jsr311-api</artifactId>
</exclusion>
<exclusion>
<groupId>jakarta.ws.rs</groupId>
<artifactId>jakarta.ws.rs-api</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- Tests --> <!-- Tests -->
<dependency> <dependency>

View File

@ -28,7 +28,6 @@ getComposeArgs()
echo "DRUID_INTEGRATION_TEST_INDEXER must be 'indexer' or 'middleManager' (is '$DRUID_INTEGRATION_TEST_INDEXER')" echo "DRUID_INTEGRATION_TEST_INDEXER must be 'indexer' or 'middleManager' (is '$DRUID_INTEGRATION_TEST_INDEXER')"
exit 1 exit 1
fi fi
if [ "$DRUID_INTEGRATION_TEST_INDEXER" = "indexer" ] if [ "$DRUID_INTEGRATION_TEST_INDEXER" = "indexer" ]
then then
# Sanity check: cannot combine CliIndexer tests with security, query-retry tests # Sanity check: cannot combine CliIndexer tests with security, query-retry tests
@ -36,10 +35,14 @@ getComposeArgs()
then then
echo "Cannot run test group '$DRUID_INTEGRATION_TEST_GROUP' with CliIndexer" echo "Cannot run test group '$DRUID_INTEGRATION_TEST_GROUP' with CliIndexer"
exit 1 exit 1
elif [ "$DRUID_INTEGRATION_TEST_GROUP" = "kafka-data-format" ]
then
# Replace MiddleManager with Indexer + schema registry container
echo "-f ${DOCKERDIR}/docker-compose.cli-indexer.yml -f ${DOCKERDIR}/docker-compose.schema-registry-indexer.yml"
else
# Replace MiddleManager with Indexer
echo "-f ${DOCKERDIR}/docker-compose.cli-indexer.yml"
fi fi
# Replace MiddleManager with Indexer
echo "-f ${DOCKERDIR}/docker-compose.cli-indexer.yml"
elif [ "$DRUID_INTEGRATION_TEST_GROUP" = "security" ] elif [ "$DRUID_INTEGRATION_TEST_GROUP" = "security" ]
then then
# default + additional druid router (custom-check-tls, permissive-tls, no-client-auth-tls) # default + additional druid router (custom-check-tls, permissive-tls, no-client-auth-tls)
@ -57,6 +60,10 @@ getComposeArgs()
then then
# the 'high availability' test cluster with multiple coordinators and overlords # the 'high availability' test cluster with multiple coordinators and overlords
echo "-f ${DOCKERDIR}/docker-compose.high-availability.yml" echo "-f ${DOCKERDIR}/docker-compose.high-availability.yml"
elif [ "$DRUID_INTEGRATION_TEST_GROUP" = "kafka-data-format" ]
then
# default + schema registry container
echo "-f ${DOCKERDIR}/docker-compose.yml -f ${DOCKERDIR}/docker-compose.schema-registry.yml"
else else
# default # default
echo "-f ${DOCKERDIR}/docker-compose.yml" echo "-f ${DOCKERDIR}/docker-compose.yml"

View File

@ -63,6 +63,7 @@ public class ConfigFileConfigProvider implements IntegrationTestingConfigProvide
private String middleManagerHost; private String middleManagerHost;
private String zookeeperHosts; // comma-separated list of host:port private String zookeeperHosts; // comma-separated list of host:port
private String kafkaHost; private String kafkaHost;
private String schemaRegistryHost;
private Map<String, String> props = null; private Map<String, String> props = null;
private String username; private String username;
private String password; private String password;
@ -222,6 +223,7 @@ public class ConfigFileConfigProvider implements IntegrationTestingConfigProvide
zookeeperHosts = props.get("zookeeper_hosts"); zookeeperHosts = props.get("zookeeper_hosts");
kafkaHost = props.get("kafka_host") + ":" + props.get("kafka_port"); kafkaHost = props.get("kafka_host") + ":" + props.get("kafka_port");
schemaRegistryHost = props.get("schema_registry_host") + ":" + props.get("schema_registry_port");
username = props.get("username"); username = props.get("username");
@ -499,6 +501,12 @@ public class ConfigFileConfigProvider implements IntegrationTestingConfigProvide
return streamEndpoint; return streamEndpoint;
} }
@Override
public String getSchemaRegistryHost()
{
return schemaRegistryHost;
}
@Override @Override
public Map<String, String> getProperties() public Map<String, String> getProperties()
{ {

View File

@ -32,6 +32,13 @@ import java.io.IOException;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
/**
* The values here should be kept in sync with the values used in the docker-compose files used to bring up the
* integration-test clusters.
*
* integration-tests/docker/docker-compose.base.yml defines most of the hostnames, ports, and addresses, but some
* might live in the overrides as well.
*/
public class DockerConfigProvider implements IntegrationTestingConfigProvider public class DockerConfigProvider implements IntegrationTestingConfigProvider
{ {
@JsonProperty @JsonProperty
@ -317,6 +324,18 @@ public class DockerConfigProvider implements IntegrationTestingConfigProvider
return "druid-historical"; return "druid-historical";
} }
@Override
public String getSchemaRegistryHost()
{
return dockerIp + ":8085";
}
@Override
public String getSchemaRegistryInternalHost()
{
return "schema-registry:8085";
}
@Override @Override
public String getProperty(String prop) public String getProperty(String prop)
{ {

View File

@ -164,6 +164,13 @@ public interface IntegrationTestingConfig
String getStreamEndpoint(); String getStreamEndpoint();
String getSchemaRegistryHost();
default String getSchemaRegistryInternalHost()
{
return getSchemaRegistryHost();
}
boolean isDocker(); boolean isDocker();
@Nullable @Nullable

View File

@ -40,7 +40,7 @@ public class AvroEventSerializer implements EventSerializer
{ {
public static final String TYPE = "avro"; public static final String TYPE = "avro";
private static final Schema SCHEMA = SchemaBuilder static final Schema SCHEMA = SchemaBuilder
.record("wikipedia") .record("wikipedia")
.namespace("org.apache.druid") .namespace("org.apache.druid")
.fields() .fields()
@ -62,12 +62,12 @@ public class AvroEventSerializer implements EventSerializer
.requiredInt("delta") .requiredInt("delta")
.endRecord(); .endRecord();
private final DatumWriter<Object> writer = new GenericDatumWriter<>(SCHEMA); protected final DatumWriter<Object> writer = new GenericDatumWriter<>(SCHEMA);
@Override @Override
public byte[] serialize(List<Pair<String, Object>> event) throws IOException public byte[] serialize(List<Pair<String, Object>> event) throws IOException
{ {
final WikipediaRecord record = new WikipediaRecord(); final WikipediaRecord record = new WikipediaRecord(SCHEMA);
event.forEach(pair -> record.put(pair.lhs, pair.rhs)); event.forEach(pair -> record.put(pair.lhs, pair.rhs));
final ByteArrayOutputStream out = new ByteArrayOutputStream(); final ByteArrayOutputStream out = new ByteArrayOutputStream();
final BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(out, null); final BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(out, null);
@ -82,12 +82,18 @@ public class AvroEventSerializer implements EventSerializer
{ {
} }
private static class WikipediaRecord implements GenericRecord static class WikipediaRecord implements GenericRecord
{ {
private final Map<String, Object> event = new HashMap<>(); private final Map<String, Object> event = new HashMap<>();
private final BiMap<Integer, String> indexes = HashBiMap.create(SCHEMA.getFields().size()); private final BiMap<Integer, String> indexes = HashBiMap.create(SCHEMA.getFields().size());
private int nextIndex = 0; private int nextIndex = 0;
private final Schema schema;
public WikipediaRecord(Schema schema)
{
this.schema = schema;
}
@Override @Override
public void put(String key, Object v) public void put(String key, Object v)
@ -125,7 +131,7 @@ public class AvroEventSerializer implements EventSerializer
@Override @Override
public Schema getSchema() public Schema getSchema()
{ {
return SCHEMA; return schema;
} }
} }
} }

View File

@ -0,0 +1,106 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.testing.utils;
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.google.common.collect.ImmutableMap;
import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.io.EncoderFactory;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.RetryUtils;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.testing.IntegrationTestingConfig;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;
public class AvroSchemaRegistryEventSerializer extends AvroEventSerializer
{
private static final int MAX_INITIALIZE_RETRIES = 10;
public static final String TYPE = "avro-schema-registry";
private final IntegrationTestingConfig config;
private final CachedSchemaRegistryClient client;
private int schemaId = -1;
private Schema fromRegistry;
@JsonCreator
public AvroSchemaRegistryEventSerializer(
@JacksonInject IntegrationTestingConfig config
)
{
this.config = config;
this.client = new CachedSchemaRegistryClient(
StringUtils.format("http://%s", config.getSchemaRegistryHost()),
Integer.MAX_VALUE,
ImmutableMap.of(
"basic.auth.credentials.source", "USER_INFO",
"basic.auth.user.info", "druid:diurd"
),
ImmutableMap.of()
);
}
@Override
public void initialize(String topic)
{
try {
RetryUtils.retry(
() -> {
schemaId = client.register(topic, AvroEventSerializer.SCHEMA);
fromRegistry = client.getById(schemaId);
return 0;
},
(e) -> true,
MAX_INITIALIZE_RETRIES
);
}
catch (Exception e) {
throw new RuntimeException(e);
}
}
@Override
public byte[] serialize(List<Pair<String, Object>> event) throws IOException
{
final WikipediaRecord record = new WikipediaRecord(fromRegistry);
event.forEach(pair -> record.put(pair.lhs, pair.rhs));
ByteArrayOutputStream out = new ByteArrayOutputStream();
out.write(0x0);
out.write(ByteBuffer.allocate(4).putInt(schemaId).array());
BinaryEncoder encoder = EncoderFactory.get().directBinaryEncoder(out, null);
DatumWriter<Object> writer = new GenericDatumWriter<>(fromRegistry);
writer.write(record, encoder);
encoder.flush();
byte[] bytes = out.toByteArray();
out.close();
return bytes;
}
}

View File

@ -42,9 +42,15 @@ import java.util.List;
@Type(name = JsonEventSerializer.TYPE, value = JsonEventSerializer.class), @Type(name = JsonEventSerializer.TYPE, value = JsonEventSerializer.class),
@Type(name = CsvEventSerializer.TYPE, value = CsvEventSerializer.class), @Type(name = CsvEventSerializer.TYPE, value = CsvEventSerializer.class),
@Type(name = DelimitedEventSerializer.TYPE, value = DelimitedEventSerializer.class), @Type(name = DelimitedEventSerializer.TYPE, value = DelimitedEventSerializer.class),
@Type(name = AvroEventSerializer.TYPE, value = AvroEventSerializer.class) @Type(name = AvroEventSerializer.TYPE, value = AvroEventSerializer.class),
@Type(name = AvroSchemaRegistryEventSerializer.TYPE, value = AvroSchemaRegistryEventSerializer.class)
}) })
public interface EventSerializer extends Closeable public interface EventSerializer extends Closeable
{ {
default void initialize(String topic)
{
}
byte[] serialize(List<Pair<String, Object>> event) throws IOException; byte[] serialize(List<Pair<String, Object>> event) throws IOException;
} }

View File

@ -61,6 +61,7 @@ public abstract class SyntheticStreamGenerator implements StreamGenerator
DateTime overrrideFirstEventTime DateTime overrrideFirstEventTime
) )
{ {
serializer.initialize(streamTopic);
// The idea here is that we will send [eventsPerSecond] events that will either use [nowFlooredToSecond] // The idea here is that we will send [eventsPerSecond] events that will either use [nowFlooredToSecond]
// or the [overrrideFirstEventTime] as the primary timestamp. // or the [overrrideFirstEventTime] as the primary timestamp.
// Having a fixed number of events that use the same timestamp will help in allowing us to determine if any events // Having a fixed number of events that use the same timestamp will help in allowing us to determine if any events

View File

@ -83,6 +83,7 @@ public abstract class AbstractKafkaIndexingServiceTest extends AbstractStreamInd
"%%TOPIC_VALUE%%", "%%TOPIC_VALUE%%",
streamName streamName
); );
if (AbstractStreamIndexingTest.INPUT_FORMAT.equals(parserType)) { if (AbstractStreamIndexingTest.INPUT_FORMAT.equals(parserType)) {
spec = StringUtils.replace( spec = StringUtils.replace(
spec, spec,
@ -116,6 +117,13 @@ public abstract class AbstractKafkaIndexingServiceTest extends AbstractStreamInd
"%%STREAM_PROPERTIES_KEY%%", "%%STREAM_PROPERTIES_KEY%%",
"consumerProperties" "consumerProperties"
); );
spec = StringUtils.replace(
spec,
"%%SCHEMA_REGISTRY_HOST%%",
StringUtils.format("http://%s", config.getSchemaRegistryInternalHost())
);
return StringUtils.replace( return StringUtils.replace(
spec, spec,
"%%STREAM_PROPERTIES_VALUE%%", "%%STREAM_PROPERTIES_VALUE%%",

View File

@ -0,0 +1,21 @@
{
"type": "avro_stream",
"avroBytesDecoder" : {
"type": "schema_registry",
"url": "%%SCHEMA_REGISTRY_HOST%%",
"config": {
"basic.auth.credentials.source": "USER_INFO",
"basic.auth.user.info": "druid:diurd"
}
},
"parseSpec": {
"format": "avro",
"timestampSpec": {
"column": "timestamp",
"format": "auto"
},
"dimensionsSpec": {
"dimensions": ["page", "language", "user", "unpatrolled", "newPage", "robot", "anonymous", "namespace", "continent", "country", "region", "city"]
}
}
}

View File

@ -0,0 +1,3 @@
{
"type": "avro-schema-registry"
}

View File

@ -1949,6 +1949,7 @@
<exclude>**/*.json</exclude> <exclude>**/*.json</exclude>
<exclude>**/*.parq</exclude> <exclude>**/*.parq</exclude>
<exclude>**/*.parquet</exclude> <exclude>**/*.parquet</exclude>
<exclude>**/docker/schema-registry/*</exclude>
<exclude>LICENSE</exclude> <exclude>LICENSE</exclude>
<exclude>LICENSE.BINARY</exclude> <exclude>LICENSE.BINARY</exclude>
<exclude>NOTICE</exclude> <exclude>NOTICE</exclude>