diff --git a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoder.java b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoder.java index 42765fa6778..123f8fae89b 100644 --- a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoder.java +++ b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoder.java @@ -26,14 +26,17 @@ import io.confluent.kafka.schemaregistry.ParsedSchema; import io.confluent.kafka.schemaregistry.avro.AvroSchema; import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient; import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; +import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException; import org.apache.avro.Schema; import org.apache.avro.generic.GenericDatumReader; import org.apache.avro.generic.GenericRecord; import org.apache.avro.io.DatumReader; import org.apache.avro.io.DecoderFactory; +import org.apache.druid.java.util.common.RE; import org.apache.druid.java.util.common.parsers.ParseException; import javax.annotation.Nullable; +import java.io.IOException; import java.nio.ByteBuffer; import java.util.List; import java.util.Map; @@ -70,18 +73,32 @@ public class SchemaRegistryBasedAvroBytesDecoder implements AvroBytesDecoder @Override public GenericRecord parse(ByteBuffer bytes) { + int length = bytes.limit() - 1 - 4; + if (length < 0) { + throw new ParseException("Failed to decode avro message, not enough bytes to decode (%s)", bytes.limit()); + } + + bytes.get(); // ignore first \0 byte + int id = bytes.getInt(); // extract schema registry id + int offset = bytes.position() + bytes.arrayOffset(); + Schema schema; + try { - bytes.get(); // ignore first \0 byte - int id = bytes.getInt(); // extract schema registry id - int length = bytes.limit() - 1 - 4; - int offset = bytes.position() + bytes.arrayOffset(); ParsedSchema parsedSchema = registry.getSchemaById(id); - Schema schema = parsedSchema instanceof AvroSchema ? ((AvroSchema) parsedSchema).rawSchema() : null; - DatumReader reader = new GenericDatumReader<>(schema); + schema = parsedSchema instanceof AvroSchema ? ((AvroSchema) parsedSchema).rawSchema() : null; + } + catch (IOException | RestClientException ex) { + throw new RE(ex, "Failed to get Avro schema: %s", id); + } + if (schema == null) { + throw new RE("Failed to find Avro schema: %s", id); + } + DatumReader reader = new GenericDatumReader<>(schema); + try { return reader.read(null, DecoderFactory.get().binaryDecoder(bytes.array(), offset, length, null)); } catch (Exception e) { - throw new ParseException(e, "Fail to decode avro message!"); + throw new ParseException(e, "Fail to decode Avro message for schema: %s!", id); } } diff --git a/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoderTest.java b/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoderTest.java index 55c7e6bd452..3eb643934fe 100644 --- a/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoderTest.java +++ b/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoderTest.java @@ -20,6 +20,7 @@ package org.apache.druid.data.input.avro; import com.fasterxml.jackson.databind.ObjectMapper; +import io.confluent.kafka.schemaregistry.ParsedSchema; import io.confluent.kafka.schemaregistry.avro.AvroSchema; import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; import org.apache.avro.Schema; @@ -29,6 +30,7 @@ import org.apache.avro.io.EncoderFactory; import org.apache.avro.specific.SpecificDatumWriter; import org.apache.druid.data.input.AvroStreamInputRowParserTest; import org.apache.druid.data.input.SomeAvroDatum; +import org.apache.druid.java.util.common.RE; import org.apache.druid.java.util.common.parsers.ParseException; import org.junit.Assert; import org.junit.Before; @@ -96,40 +98,60 @@ public class SchemaRegistryBasedAvroBytesDecoderTest public void testParse() throws Exception { // Given - Mockito.when(registry.getSchemaById(ArgumentMatchers.eq(1234))).thenReturn(new AvroSchema(SomeAvroDatum.getClassSchema())); + Mockito.when(registry.getSchemaById(ArgumentMatchers.eq(1234))) + .thenReturn(new AvroSchema(SomeAvroDatum.getClassSchema())); GenericRecord someAvroDatum = AvroStreamInputRowParserTest.buildSomeAvroDatum(); Schema schema = SomeAvroDatum.getClassSchema(); byte[] bytes = getAvroDatum(schema, someAvroDatum); ByteBuffer bb = ByteBuffer.allocate(bytes.length + 5).put((byte) 0).putInt(1234).put(bytes); bb.rewind(); // When - GenericRecord actual = new SchemaRegistryBasedAvroBytesDecoder(registry).parse(bb); - // Then - Assert.assertEquals(someAvroDatum.get("id"), actual.get("id")); + new SchemaRegistryBasedAvroBytesDecoder(registry).parse(bb); } @Test(expected = ParseException.class) - public void testParseCorrupted() throws Exception + public void testParseCorruptedNotEnoughBytesToEvenGetSchemaInfo() { // Given - Mockito.when(registry.getSchemaById(ArgumentMatchers.eq(1234))).thenReturn(new AvroSchema(SomeAvroDatum.getClassSchema())); - GenericRecord someAvroDatum = AvroStreamInputRowParserTest.buildSomeAvroDatum(); - Schema schema = SomeAvroDatum.getClassSchema(); - byte[] bytes = getAvroDatum(schema, someAvroDatum); - ByteBuffer bb = ByteBuffer.allocate(bytes.length + 5).put((byte) 0).putInt(1234).put((bytes), 5, 10); + ByteBuffer bb = ByteBuffer.allocate(2).put((byte) 0).put(1, (byte) 1); + bb.rewind(); // When new SchemaRegistryBasedAvroBytesDecoder(registry).parse(bb); } @Test(expected = ParseException.class) + public void testParseCorruptedPartial() throws Exception + { + // Given + Mockito.when(registry.getSchemaById(ArgumentMatchers.eq(1234))) + .thenReturn(new AvroSchema(SomeAvroDatum.getClassSchema())); + GenericRecord someAvroDatum = AvroStreamInputRowParserTest.buildSomeAvroDatum(); + Schema schema = SomeAvroDatum.getClassSchema(); + byte[] bytes = getAvroDatum(schema, someAvroDatum); + ByteBuffer bb = ByteBuffer.allocate(4 + 5).put((byte) 0).putInt(1234).put(bytes, 5, 4); + bb.rewind(); + // When + new SchemaRegistryBasedAvroBytesDecoder(registry).parse(bb); + } + + @Test(expected = RE.class) + public void testParseWrongSchemaType() throws Exception + { + // Given + Mockito.when(registry.getSchemaById(ArgumentMatchers.eq(1234))).thenReturn(Mockito.mock(ParsedSchema.class)); + ByteBuffer bb = ByteBuffer.allocate(5).put((byte) 0).putInt(1234); + bb.rewind(); + // When + new SchemaRegistryBasedAvroBytesDecoder(registry).parse(bb); + } + + @Test(expected = RE.class) public void testParseWrongId() throws Exception { // Given Mockito.when(registry.getSchemaById(ArgumentMatchers.anyInt())).thenThrow(new IOException("no pasaran")); - GenericRecord someAvroDatum = AvroStreamInputRowParserTest.buildSomeAvroDatum(); - Schema schema = SomeAvroDatum.getClassSchema(); - byte[] bytes = getAvroDatum(schema, someAvroDatum); - ByteBuffer bb = ByteBuffer.allocate(bytes.length + 5).put((byte) 0).putInt(1234).put(bytes); + ByteBuffer bb = ByteBuffer.allocate(5).put((byte) 0).putInt(1234); + bb.rewind(); // When new SchemaRegistryBasedAvroBytesDecoder(registry).parse(bb); } diff --git a/integration-tests/docker/docker-compose.base.yml b/integration-tests/docker/docker-compose.base.yml index 66b852691a2..46dda1d2701 100644 --- a/integration-tests/docker/docker-compose.base.yml +++ b/integration-tests/docker/docker-compose.base.yml @@ -34,7 +34,7 @@ networks: - subnet: 172.172.172.0/24 services: -### supporting infra: +### always there supporting infra: druid-zookeeper-kafka: image: druid/cluster container_name: druid-zookeeper-kafka @@ -71,45 +71,6 @@ services: env_file: - ./environment-configs/common - druid-it-hadoop: - image: druid-it/hadoop:2.8.5 - container_name: druid-it-hadoop - ports: - - 2049:2049 - - 2122:2122 - - 8020:8020 - - 8021:8021 - - 8030:8030 - - 8031:8031 - - 8032:8032 - - 8033:8033 - - 8040:8040 - - 8042:8042 - - 8088:8088 - - 8443:8443 - - 9000:9000 - - 10020:10020 - - 19888:19888 - - 34455:34455 - - 50010:50010 - - 50020:50020 - - 50030:50030 - - 50060:50060 - - 50070:50070 - - 50075:50075 - - 50090:50090 - - 51111:51111 - networks: - druid-it-net: - ipv4_address: 172.172.172.101 - privileged: true - volumes: - - ${HOME}/shared:/shared - - ./../src/test/resources:/resources - hostname: "druid-it-hadoop" - command: "bash -c 'echo Start druid-it-hadoop container... && \ - /etc/bootstrap.sh && \ - tail -f /dev/null'" ### overlords druid-overlord: @@ -357,12 +318,54 @@ services: - ./environment-configs/common - ./environment-configs/router-custom-check-tls +### optional supporting infra + druid-it-hadoop: + image: druid-it/hadoop:2.8.5 + container_name: druid-it-hadoop + ports: + - 2049:2049 + - 2122:2122 + - 8020:8020 + - 8021:8021 + - 8030:8030 + - 8031:8031 + - 8032:8032 + - 8033:8033 + - 8040:8040 + - 8042:8042 + - 8088:8088 + - 8443:8443 + - 9000:9000 + - 10020:10020 + - 19888:19888 + - 34455:34455 + - 50010:50010 + - 50020:50020 + - 50030:50030 + - 50060:50060 + - 50070:50070 + - 50075:50075 + - 50090:50090 + - 51111:51111 + networks: + druid-it-net: + ipv4_address: 172.172.172.101 + privileged: true + volumes: + - ${HOME}/shared:/shared + - ./../src/test/resources:/resources + hostname: "druid-it-hadoop" + command: "bash -c 'echo Start druid-it-hadoop container... && \ + /etc/bootstrap.sh && \ + tail -f /dev/null'" + + druid-openldap: image: osixia/openldap:1.4.0 container_name: druid-openldap networks: druid-it-net: - ipv4_address: 172.172.172.74 + ipv4_address: 172.172.172.102 ports: - 8389:389 - 8636:636 @@ -373,3 +376,26 @@ services: env_file: - ./environment-configs/common command: --copy-service + + + schema-registry: + image: confluentinc/cp-schema-registry:5.5.1 + container_name: schema-registry + ports: + - 8085:8085 + networks: + druid-it-net: + ipv4_address: 172.172.172.103 + volumes: + - ${HOME}/shared:/shared + - ./schema-registry/jaas_config.file:/usr/lib/druid/conf/jaas_config.file + - ./schema-registry/password-file:/usr/lib/druid/conf/password-file + privileged: true + environment: + SCHEMA_REGISTRY_HOST_NAME: schema-registry + SCHEMA_REGISTRY_LISTENERS: "http://0.0.0.0:8085" + SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: druid-zookeeper-kafka:9092 + SCHEMA_REGISTRY_AUTHENTICATION_METHOD: BASIC + SCHEMA_REGISTRY_AUTHENTICATION_REALM: druid + SCHEMA_REGISTRY_AUTHENTICATION_ROLES: users + SCHEMA_REGISTRY_OPTS: -Djava.security.auth.login.config=/usr/lib/druid/conf/jaas_config.file diff --git a/integration-tests/docker/docker-compose.schema-registry-indexer.yml b/integration-tests/docker/docker-compose.schema-registry-indexer.yml new file mode 100644 index 00000000000..71c38141824 --- /dev/null +++ b/integration-tests/docker/docker-compose.schema-registry-indexer.yml @@ -0,0 +1,29 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +version: "2.2" +services: + schema-registry: + extends: + file: docker-compose.base.yml + service: schema-registry + depends_on: + - druid-zookeeper-kafka + links: + - druid-zookeeper-kafka:druid-zookeeper-kafka + - druid-coordinator:druid-coordinator + - druid-broker:druid-broker + - druid-historical:druid-historical + - druid-indexer:druid-indexer diff --git a/integration-tests/docker/docker-compose.schema-registry.yml b/integration-tests/docker/docker-compose.schema-registry.yml new file mode 100644 index 00000000000..5611e2abcf2 --- /dev/null +++ b/integration-tests/docker/docker-compose.schema-registry.yml @@ -0,0 +1,29 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +version: "2.2" +services: + schema-registry: + extends: + file: docker-compose.base.yml + service: schema-registry + depends_on: + - druid-zookeeper-kafka + links: + - druid-zookeeper-kafka:druid-zookeeper-kafka + - druid-coordinator:druid-coordinator + - druid-broker:druid-broker + - druid-middlemanager:druid-middlemanager + - druid-historical:druid-historical diff --git a/integration-tests/docker/schema-registry/jaas_config.file b/integration-tests/docker/schema-registry/jaas_config.file new file mode 100644 index 00000000000..dc48bed4051 --- /dev/null +++ b/integration-tests/docker/schema-registry/jaas_config.file @@ -0,0 +1,5 @@ +druid { + org.eclipse.jetty.jaas.spi.PropertyFileLoginModule required + file="/usr/lib/druid/conf/password-file" + debug="true"; +}; \ No newline at end of file diff --git a/integration-tests/docker/schema-registry/password-file b/integration-tests/docker/schema-registry/password-file new file mode 100644 index 00000000000..c850844c627 --- /dev/null +++ b/integration-tests/docker/schema-registry/password-file @@ -0,0 +1 @@ +druid: diurd,users \ No newline at end of file diff --git a/integration-tests/pom.xml b/integration-tests/pom.xml index fc3b7b50eec..2fb9adc239e 100644 --- a/integration-tests/pom.xml +++ b/integration-tests/pom.xml @@ -31,6 +31,13 @@ 0.22.0-SNAPSHOT + + + confluent + https://packages.confluent.io/maven/ + + + com.amazonaws @@ -320,6 +327,41 @@ guice-servlet ${guice.version} + + io.confluent + kafka-schema-registry-client + 5.5.1 + + + org.slf4j + slf4j-log4j12 + + + org.apache.avro + avro + + + com.fasterxml.jackson.core + jackson-databind + + + javax.ws.rs + javax.ws.rs-api + + + javax.ws.rs + javax.ws.rs-api + + + javax.ws.rs + jsr311-api + + + jakarta.ws.rs + jakarta.ws.rs-api + + + diff --git a/integration-tests/script/docker_compose_args.sh b/integration-tests/script/docker_compose_args.sh index e43f88d7c38..ea61e88d287 100644 --- a/integration-tests/script/docker_compose_args.sh +++ b/integration-tests/script/docker_compose_args.sh @@ -28,7 +28,6 @@ getComposeArgs() echo "DRUID_INTEGRATION_TEST_INDEXER must be 'indexer' or 'middleManager' (is '$DRUID_INTEGRATION_TEST_INDEXER')" exit 1 fi - if [ "$DRUID_INTEGRATION_TEST_INDEXER" = "indexer" ] then # Sanity check: cannot combine CliIndexer tests with security, query-retry tests @@ -36,10 +35,14 @@ getComposeArgs() then echo "Cannot run test group '$DRUID_INTEGRATION_TEST_GROUP' with CliIndexer" exit 1 + elif [ "$DRUID_INTEGRATION_TEST_GROUP" = "kafka-data-format" ] + then + # Replace MiddleManager with Indexer + schema registry container + echo "-f ${DOCKERDIR}/docker-compose.cli-indexer.yml -f ${DOCKERDIR}/docker-compose.schema-registry-indexer.yml" + else + # Replace MiddleManager with Indexer + echo "-f ${DOCKERDIR}/docker-compose.cli-indexer.yml" fi - - # Replace MiddleManager with Indexer - echo "-f ${DOCKERDIR}/docker-compose.cli-indexer.yml" elif [ "$DRUID_INTEGRATION_TEST_GROUP" = "security" ] then # default + additional druid router (custom-check-tls, permissive-tls, no-client-auth-tls) @@ -57,6 +60,10 @@ getComposeArgs() then # the 'high availability' test cluster with multiple coordinators and overlords echo "-f ${DOCKERDIR}/docker-compose.high-availability.yml" + elif [ "$DRUID_INTEGRATION_TEST_GROUP" = "kafka-data-format" ] + then + # default + schema registry container + echo "-f ${DOCKERDIR}/docker-compose.yml -f ${DOCKERDIR}/docker-compose.schema-registry.yml" else # default echo "-f ${DOCKERDIR}/docker-compose.yml" diff --git a/integration-tests/src/main/java/org/apache/druid/testing/ConfigFileConfigProvider.java b/integration-tests/src/main/java/org/apache/druid/testing/ConfigFileConfigProvider.java index 162eda906b7..7cd0387b5fb 100644 --- a/integration-tests/src/main/java/org/apache/druid/testing/ConfigFileConfigProvider.java +++ b/integration-tests/src/main/java/org/apache/druid/testing/ConfigFileConfigProvider.java @@ -63,6 +63,7 @@ public class ConfigFileConfigProvider implements IntegrationTestingConfigProvide private String middleManagerHost; private String zookeeperHosts; // comma-separated list of host:port private String kafkaHost; + private String schemaRegistryHost; private Map props = null; private String username; private String password; @@ -222,6 +223,7 @@ public class ConfigFileConfigProvider implements IntegrationTestingConfigProvide zookeeperHosts = props.get("zookeeper_hosts"); kafkaHost = props.get("kafka_host") + ":" + props.get("kafka_port"); + schemaRegistryHost = props.get("schema_registry_host") + ":" + props.get("schema_registry_port"); username = props.get("username"); @@ -499,6 +501,12 @@ public class ConfigFileConfigProvider implements IntegrationTestingConfigProvide return streamEndpoint; } + @Override + public String getSchemaRegistryHost() + { + return schemaRegistryHost; + } + @Override public Map getProperties() { diff --git a/integration-tests/src/main/java/org/apache/druid/testing/DockerConfigProvider.java b/integration-tests/src/main/java/org/apache/druid/testing/DockerConfigProvider.java index 8924cfc402d..bb742a99ee9 100644 --- a/integration-tests/src/main/java/org/apache/druid/testing/DockerConfigProvider.java +++ b/integration-tests/src/main/java/org/apache/druid/testing/DockerConfigProvider.java @@ -32,6 +32,13 @@ import java.io.IOException; import java.util.HashMap; import java.util.Map; +/** + * The values here should be kept in sync with the values used in the docker-compose files used to bring up the + * integration-test clusters. + * + * integration-tests/docker/docker-compose.base.yml defines most of the hostnames, ports, and addresses, but some + * might live in the overrides as well. + */ public class DockerConfigProvider implements IntegrationTestingConfigProvider { @JsonProperty @@ -317,6 +324,18 @@ public class DockerConfigProvider implements IntegrationTestingConfigProvider return "druid-historical"; } + @Override + public String getSchemaRegistryHost() + { + return dockerIp + ":8085"; + } + + @Override + public String getSchemaRegistryInternalHost() + { + return "schema-registry:8085"; + } + @Override public String getProperty(String prop) { diff --git a/integration-tests/src/main/java/org/apache/druid/testing/IntegrationTestingConfig.java b/integration-tests/src/main/java/org/apache/druid/testing/IntegrationTestingConfig.java index 3c1951c286b..b65507e7020 100644 --- a/integration-tests/src/main/java/org/apache/druid/testing/IntegrationTestingConfig.java +++ b/integration-tests/src/main/java/org/apache/druid/testing/IntegrationTestingConfig.java @@ -164,6 +164,13 @@ public interface IntegrationTestingConfig String getStreamEndpoint(); + String getSchemaRegistryHost(); + + default String getSchemaRegistryInternalHost() + { + return getSchemaRegistryHost(); + } + boolean isDocker(); @Nullable diff --git a/integration-tests/src/main/java/org/apache/druid/testing/utils/AvroEventSerializer.java b/integration-tests/src/main/java/org/apache/druid/testing/utils/AvroEventSerializer.java index 284fd098fc0..7457f84fbe7 100644 --- a/integration-tests/src/main/java/org/apache/druid/testing/utils/AvroEventSerializer.java +++ b/integration-tests/src/main/java/org/apache/druid/testing/utils/AvroEventSerializer.java @@ -40,7 +40,7 @@ public class AvroEventSerializer implements EventSerializer { public static final String TYPE = "avro"; - private static final Schema SCHEMA = SchemaBuilder + static final Schema SCHEMA = SchemaBuilder .record("wikipedia") .namespace("org.apache.druid") .fields() @@ -62,12 +62,12 @@ public class AvroEventSerializer implements EventSerializer .requiredInt("delta") .endRecord(); - private final DatumWriter writer = new GenericDatumWriter<>(SCHEMA); + protected final DatumWriter writer = new GenericDatumWriter<>(SCHEMA); @Override public byte[] serialize(List> event) throws IOException { - final WikipediaRecord record = new WikipediaRecord(); + final WikipediaRecord record = new WikipediaRecord(SCHEMA); event.forEach(pair -> record.put(pair.lhs, pair.rhs)); final ByteArrayOutputStream out = new ByteArrayOutputStream(); final BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(out, null); @@ -82,12 +82,18 @@ public class AvroEventSerializer implements EventSerializer { } - private static class WikipediaRecord implements GenericRecord + static class WikipediaRecord implements GenericRecord { private final Map event = new HashMap<>(); private final BiMap indexes = HashBiMap.create(SCHEMA.getFields().size()); private int nextIndex = 0; + private final Schema schema; + + public WikipediaRecord(Schema schema) + { + this.schema = schema; + } @Override public void put(String key, Object v) @@ -125,7 +131,7 @@ public class AvroEventSerializer implements EventSerializer @Override public Schema getSchema() { - return SCHEMA; + return schema; } } } diff --git a/integration-tests/src/main/java/org/apache/druid/testing/utils/AvroSchemaRegistryEventSerializer.java b/integration-tests/src/main/java/org/apache/druid/testing/utils/AvroSchemaRegistryEventSerializer.java new file mode 100644 index 00000000000..dd1e82f82df --- /dev/null +++ b/integration-tests/src/main/java/org/apache/druid/testing/utils/AvroSchemaRegistryEventSerializer.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.testing.utils; + +import com.fasterxml.jackson.annotation.JacksonInject; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.google.common.collect.ImmutableMap; +import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericDatumWriter; +import org.apache.avro.io.BinaryEncoder; +import org.apache.avro.io.DatumWriter; +import org.apache.avro.io.EncoderFactory; +import org.apache.druid.java.util.common.Pair; +import org.apache.druid.java.util.common.RetryUtils; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.testing.IntegrationTestingConfig; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.List; + +public class AvroSchemaRegistryEventSerializer extends AvroEventSerializer +{ + private static final int MAX_INITIALIZE_RETRIES = 10; + public static final String TYPE = "avro-schema-registry"; + + private final IntegrationTestingConfig config; + private final CachedSchemaRegistryClient client; + private int schemaId = -1; + + private Schema fromRegistry; + + @JsonCreator + public AvroSchemaRegistryEventSerializer( + @JacksonInject IntegrationTestingConfig config + ) + { + this.config = config; + this.client = new CachedSchemaRegistryClient( + StringUtils.format("http://%s", config.getSchemaRegistryHost()), + Integer.MAX_VALUE, + ImmutableMap.of( + "basic.auth.credentials.source", "USER_INFO", + "basic.auth.user.info", "druid:diurd" + ), + ImmutableMap.of() + ); + + } + + @Override + public void initialize(String topic) + { + try { + RetryUtils.retry( + () -> { + schemaId = client.register(topic, AvroEventSerializer.SCHEMA); + fromRegistry = client.getById(schemaId); + return 0; + }, + (e) -> true, + MAX_INITIALIZE_RETRIES + ); + } + catch (Exception e) { + throw new RuntimeException(e); + } + } + + @Override + public byte[] serialize(List> event) throws IOException + { + final WikipediaRecord record = new WikipediaRecord(fromRegistry); + event.forEach(pair -> record.put(pair.lhs, pair.rhs)); + + ByteArrayOutputStream out = new ByteArrayOutputStream(); + out.write(0x0); + out.write(ByteBuffer.allocate(4).putInt(schemaId).array()); + BinaryEncoder encoder = EncoderFactory.get().directBinaryEncoder(out, null); + DatumWriter writer = new GenericDatumWriter<>(fromRegistry); + writer.write(record, encoder); + encoder.flush(); + byte[] bytes = out.toByteArray(); + out.close(); + return bytes; + } +} diff --git a/integration-tests/src/main/java/org/apache/druid/testing/utils/EventSerializer.java b/integration-tests/src/main/java/org/apache/druid/testing/utils/EventSerializer.java index 014d8c80e66..cad5acf79e6 100644 --- a/integration-tests/src/main/java/org/apache/druid/testing/utils/EventSerializer.java +++ b/integration-tests/src/main/java/org/apache/druid/testing/utils/EventSerializer.java @@ -42,9 +42,15 @@ import java.util.List; @Type(name = JsonEventSerializer.TYPE, value = JsonEventSerializer.class), @Type(name = CsvEventSerializer.TYPE, value = CsvEventSerializer.class), @Type(name = DelimitedEventSerializer.TYPE, value = DelimitedEventSerializer.class), - @Type(name = AvroEventSerializer.TYPE, value = AvroEventSerializer.class) + @Type(name = AvroEventSerializer.TYPE, value = AvroEventSerializer.class), + @Type(name = AvroSchemaRegistryEventSerializer.TYPE, value = AvroSchemaRegistryEventSerializer.class) }) public interface EventSerializer extends Closeable { + default void initialize(String topic) + { + + } + byte[] serialize(List> event) throws IOException; } diff --git a/integration-tests/src/main/java/org/apache/druid/testing/utils/SyntheticStreamGenerator.java b/integration-tests/src/main/java/org/apache/druid/testing/utils/SyntheticStreamGenerator.java index cf69ccd7d91..bd9c1b8922e 100644 --- a/integration-tests/src/main/java/org/apache/druid/testing/utils/SyntheticStreamGenerator.java +++ b/integration-tests/src/main/java/org/apache/druid/testing/utils/SyntheticStreamGenerator.java @@ -61,6 +61,7 @@ public abstract class SyntheticStreamGenerator implements StreamGenerator DateTime overrrideFirstEventTime ) { + serializer.initialize(streamTopic); // The idea here is that we will send [eventsPerSecond] events that will either use [nowFlooredToSecond] // or the [overrrideFirstEventTime] as the primary timestamp. // Having a fixed number of events that use the same timestamp will help in allowing us to determine if any events diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractKafkaIndexingServiceTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractKafkaIndexingServiceTest.java index 204b6ef7259..5ea11e6992c 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractKafkaIndexingServiceTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractKafkaIndexingServiceTest.java @@ -83,6 +83,7 @@ public abstract class AbstractKafkaIndexingServiceTest extends AbstractStreamInd "%%TOPIC_VALUE%%", streamName ); + if (AbstractStreamIndexingTest.INPUT_FORMAT.equals(parserType)) { spec = StringUtils.replace( spec, @@ -116,6 +117,13 @@ public abstract class AbstractKafkaIndexingServiceTest extends AbstractStreamInd "%%STREAM_PROPERTIES_KEY%%", "consumerProperties" ); + + spec = StringUtils.replace( + spec, + "%%SCHEMA_REGISTRY_HOST%%", + StringUtils.format("http://%s", config.getSchemaRegistryInternalHost()) + ); + return StringUtils.replace( spec, "%%STREAM_PROPERTIES_VALUE%%", diff --git a/integration-tests/src/test/resources/stream/data/avro_schema_registry/parser/input_row_parser.json b/integration-tests/src/test/resources/stream/data/avro_schema_registry/parser/input_row_parser.json new file mode 100644 index 00000000000..c48871ab949 --- /dev/null +++ b/integration-tests/src/test/resources/stream/data/avro_schema_registry/parser/input_row_parser.json @@ -0,0 +1,21 @@ +{ + "type": "avro_stream", + "avroBytesDecoder" : { + "type": "schema_registry", + "url": "%%SCHEMA_REGISTRY_HOST%%", + "config": { + "basic.auth.credentials.source": "USER_INFO", + "basic.auth.user.info": "druid:diurd" + } + }, + "parseSpec": { + "format": "avro", + "timestampSpec": { + "column": "timestamp", + "format": "auto" + }, + "dimensionsSpec": { + "dimensions": ["page", "language", "user", "unpatrolled", "newPage", "robot", "anonymous", "namespace", "continent", "country", "region", "city"] + } + } +} \ No newline at end of file diff --git a/integration-tests/src/test/resources/stream/data/avro_schema_registry/serializer/serializer.json b/integration-tests/src/test/resources/stream/data/avro_schema_registry/serializer/serializer.json new file mode 100644 index 00000000000..5251ade30e7 --- /dev/null +++ b/integration-tests/src/test/resources/stream/data/avro_schema_registry/serializer/serializer.json @@ -0,0 +1,3 @@ +{ + "type": "avro-schema-registry" +} \ No newline at end of file diff --git a/pom.xml b/pom.xml index 157aec2acad..10ddf028e7f 100644 --- a/pom.xml +++ b/pom.xml @@ -1949,6 +1949,7 @@ **/*.json **/*.parq **/*.parquet + **/docker/schema-registry/* LICENSE LICENSE.BINARY NOTICE