Add config and header support for confluent schema registry. (#10314)

* Add config and header support for confluent schema registry. (porting code from https://github.com/apache/druid/pull/9096)

* Add Eclipse Public License 2.0 to license check

* Update licenses.yaml, revert changes to check-licenses.py and dependencies for integration-tests

* Add spelling exception and remove unused dependency

* Use non-deprecated getSchemaById() and remove duplicated license entry

* Update docs/ingestion/data-formats.md

Co-authored-by: Clint Wylie <cjwylie@gmail.com>

* Added check for schema being null, as per Confluent code

* Missing imports and whitespace

* Updated unit tests with AvroSchema

Co-authored-by: Sergio Spinatelli <sergio.spinatelli.extern@7-tv.de>
Co-authored-by: Sergio Spinatelli <sergio.spinatelli.extern@joyn.de>
Co-authored-by: Clint Wylie <cjwylie@gmail.com>
This commit is contained in:
spinatelli 2021-02-27 23:25:35 +01:00 committed by GitHub
parent 573de3bc0d
commit 99198c02af
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 208 additions and 10 deletions

View File

@ -216,6 +216,7 @@ def build_compatible_license_names():
compatible_licenses['Apache 2.0'] = 'Apache License version 2.0' compatible_licenses['Apache 2.0'] = 'Apache License version 2.0'
compatible_licenses['Apache-2.0'] = 'Apache License version 2.0' compatible_licenses['Apache-2.0'] = 'Apache License version 2.0'
compatible_licenses['Apache 2'] = 'Apache License version 2.0' compatible_licenses['Apache 2'] = 'Apache License version 2.0'
compatible_licenses['Apache License 2'] = 'Apache License version 2.0'
compatible_licenses['Apache License 2.0'] = 'Apache License version 2.0' compatible_licenses['Apache License 2.0'] = 'Apache License version 2.0'
compatible_licenses['Apache Software License - Version 2.0'] = 'Apache License version 2.0' compatible_licenses['Apache Software License - Version 2.0'] = 'Apache License version 2.0'
compatible_licenses['The Apache License, Version 2.0'] = 'Apache License version 2.0' compatible_licenses['The Apache License, Version 2.0'] = 'Apache License version 2.0'
@ -223,6 +224,7 @@ def build_compatible_license_names():
compatible_licenses['Apache License Version 2.0'] = 'Apache License version 2.0' compatible_licenses['Apache License Version 2.0'] = 'Apache License version 2.0'
compatible_licenses['Apache License Version 2'] = 'Apache License version 2.0' compatible_licenses['Apache License Version 2'] = 'Apache License version 2.0'
compatible_licenses['Apache License v2.0'] = 'Apache License version 2.0' compatible_licenses['Apache License v2.0'] = 'Apache License version 2.0'
compatible_licenses['Apache License, 2.0'] = 'Apache License version 2.0'
compatible_licenses['Apache License, version 2.0'] = 'Apache License version 2.0' compatible_licenses['Apache License, version 2.0'] = 'Apache License version 2.0'
compatible_licenses['Apache 2.0 License'] = 'Apache License version 2.0' compatible_licenses['Apache 2.0 License'] = 'Apache License version 2.0'
@ -260,6 +262,14 @@ def build_compatible_license_names():
compatible_licenses['Eclipse Public License - Version 1.0'] = 'Eclipse Public License 1.0' compatible_licenses['Eclipse Public License - Version 1.0'] = 'Eclipse Public License 1.0'
compatible_licenses['Eclipse Public License, Version 1.0'] = 'Eclipse Public License 1.0' compatible_licenses['Eclipse Public License, Version 1.0'] = 'Eclipse Public License 1.0'
compatible_licenses['Eclipse Public License v1.0'] = 'Eclipse Public License 1.0' compatible_licenses['Eclipse Public License v1.0'] = 'Eclipse Public License 1.0'
compatible_licenses['EPL 1.0'] = 'Eclipse Public License 1.0'
compatible_licenses['Eclipse Public License 2.0'] = 'Eclipse Public License 2.0'
compatible_licenses['The Eclipse Public License, Version 2.0'] = 'Eclipse Public License 2.0'
compatible_licenses['Eclipse Public License - Version 2.0'] = 'Eclipse Public License 2.0'
compatible_licenses['Eclipse Public License, Version 2.0'] = 'Eclipse Public License 2.0'
compatible_licenses['Eclipse Public License v2.0'] = 'Eclipse Public License 2.0'
compatible_licenses['EPL 2.0'] = 'Eclipse Public License 2.0'
compatible_licenses['Eclipse Distribution License 1.0'] = 'Eclipse Distribution License 1.0' compatible_licenses['Eclipse Distribution License 1.0'] = 'Eclipse Distribution License 1.0'
compatible_licenses['Eclipse Distribution License - v 1.0'] = 'Eclipse Distribution License 1.0' compatible_licenses['Eclipse Distribution License - v 1.0'] = 'Eclipse Distribution License 1.0'

View File

@ -1016,7 +1016,13 @@ For details, see the Schema Registry [documentation](http://docs.confluent.io/cu
| type | String | This should say `schema_registry`. | no | | type | String | This should say `schema_registry`. | no |
| url | String | Specifies the url endpoint of the Schema Registry. | yes | | url | String | Specifies the url endpoint of the Schema Registry. | yes |
| capacity | Integer | Specifies the max size of the cache (default = Integer.MAX_VALUE). | no | | capacity | Integer | Specifies the max size of the cache (default = Integer.MAX_VALUE). | no |
| urls | Array<String> | Specifies the url endpoints of the multiple Schema Registry instances. | yes(if `url` is not provided) |
| config | Json | To send additional configurations, configured for Schema Registry | no |
| headers | Json | To send headers to the Schema Registry | no |
For a single schema registry instance, use Field `url` or `urls` for multi instances.
Single Instance:
```json ```json
... ...
"avroBytesDecoder" : { "avroBytesDecoder" : {
@ -1026,6 +1032,31 @@ For details, see the Schema Registry [documentation](http://docs.confluent.io/cu
... ...
``` ```
Multiple Instances:
```json
...
"avroBytesDecoder" : {
"type" : "schema_registry",
"urls" : [<schema-registry-url-1>, <schema-registry-url-2>, ...],
"config" : {
"basic.auth.credentials.source": "USER_INFO",
"basic.auth.user.info": "fred:letmein",
"schema.registry.ssl.truststore.location": "/some/secrets/kafka.client.truststore.jks",
"schema.registry.ssl.truststore.password": "<password>",
"schema.registry.ssl.keystore.location": "/some/secrets/kafka.client.keystore.jks",
"schema.registry.ssl.keystore.password": "<password>",
"schema.registry.ssl.key.password": "<password>"
...
},
"headers": {
"traceID" : "b29c5de2-0db4-490b-b421",
"timeStamp" : "1577191871865",
...
}
}
...
```
### Protobuf Parser ### Protobuf Parser
> You need to include the [`druid-protobuf-extensions`](../development/extensions-core/protobuf.md) as an extension to use the Protobuf Parser. > You need to include the [`druid-protobuf-extensions`](../development/extensions-core/protobuf.md) as an extension to use the Protobuf Parser.

View File

@ -35,7 +35,7 @@
<properties> <properties>
<schemarepo.version>0.1.3</schemarepo.version> <schemarepo.version>0.1.3</schemarepo.version>
<confluent.version>3.0.1</confluent.version> <confluent.version>5.5.1</confluent.version>
</properties> </properties>
<repositories> <repositories>
@ -169,6 +169,22 @@
<groupId>com.fasterxml.jackson.core</groupId> <groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId> <artifactId>jackson-databind</artifactId>
</exclusion> </exclusion>
<exclusion>
<groupId>javax.ws.rs</groupId>
<artifactId>javax.ws.rs-api</artifactId>
</exclusion>
<exclusion>
<groupId>javax.ws.rs</groupId>
<artifactId>javax.ws.rs-api</artifactId>
</exclusion>
<exclusion>
<groupId>javax.ws.rs</groupId>
<artifactId>jsr311-api</artifactId>
</exclusion>
<exclusion>
<groupId>jakarta.ws.rs</groupId>
<artifactId>jakarta.ws.rs-api</artifactId>
</exclusion>
</exclusions> </exclusions>
</dependency> </dependency>
<dependency> <dependency>

View File

@ -22,6 +22,8 @@ package org.apache.druid.data.input.avro;
import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import io.confluent.kafka.schemaregistry.ParsedSchema;
import io.confluent.kafka.schemaregistry.avro.AvroSchema;
import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient; import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import org.apache.avro.Schema; import org.apache.avro.Schema;
@ -31,7 +33,10 @@ import org.apache.avro.io.DatumReader;
import org.apache.avro.io.DecoderFactory; import org.apache.avro.io.DecoderFactory;
import org.apache.druid.java.util.common.parsers.ParseException; import org.apache.druid.java.util.common.parsers.ParseException;
import javax.annotation.Nullable;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
import java.util.Objects; import java.util.Objects;
public class SchemaRegistryBasedAvroBytesDecoder implements AvroBytesDecoder public class SchemaRegistryBasedAvroBytesDecoder implements AvroBytesDecoder
@ -40,12 +45,19 @@ public class SchemaRegistryBasedAvroBytesDecoder implements AvroBytesDecoder
@JsonCreator @JsonCreator
public SchemaRegistryBasedAvroBytesDecoder( public SchemaRegistryBasedAvroBytesDecoder(
@JsonProperty("url") String url, @JsonProperty("url") @Deprecated String url,
@JsonProperty("capacity") Integer capacity @JsonProperty("capacity") Integer capacity,
@JsonProperty("urls") @Nullable List<String> urls,
@JsonProperty("config") @Nullable Map<String, ?> config,
@JsonProperty("headers") @Nullable Map<String, String> headers
) )
{ {
int identityMapCapacity = capacity == null ? Integer.MAX_VALUE : capacity; int identityMapCapacity = capacity == null ? Integer.MAX_VALUE : capacity;
this.registry = new CachedSchemaRegistryClient(url, identityMapCapacity); if (url != null && !url.isEmpty()) {
this.registry = new CachedSchemaRegistryClient(url, identityMapCapacity, config, headers);
} else {
this.registry = new CachedSchemaRegistryClient(urls, identityMapCapacity, config, headers);
}
} }
//For UT only //For UT only
@ -63,7 +75,8 @@ public class SchemaRegistryBasedAvroBytesDecoder implements AvroBytesDecoder
int id = bytes.getInt(); // extract schema registry id int id = bytes.getInt(); // extract schema registry id
int length = bytes.limit() - 1 - 4; int length = bytes.limit() - 1 - 4;
int offset = bytes.position() + bytes.arrayOffset(); int offset = bytes.position() + bytes.arrayOffset();
Schema schema = registry.getByID(id); ParsedSchema parsedSchema = registry.getSchemaById(id);
Schema schema = parsedSchema instanceof AvroSchema ? ((AvroSchema) parsedSchema).rawSchema() : null;
DatumReader<GenericRecord> reader = new GenericDatumReader<>(schema); DatumReader<GenericRecord> reader = new GenericDatumReader<>(schema);
return reader.read(null, DecoderFactory.get().binaryDecoder(bytes.array(), offset, length, null)); return reader.read(null, DecoderFactory.get().binaryDecoder(bytes.array(), offset, length, null));
} }

View File

@ -19,6 +19,8 @@
package org.apache.druid.data.input.avro; package org.apache.druid.data.input.avro;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.confluent.kafka.schemaregistry.avro.AvroSchema;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import org.apache.avro.Schema; import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.GenericRecord;
@ -48,11 +50,53 @@ public class SchemaRegistryBasedAvroBytesDecoderTest
registry = Mockito.mock(SchemaRegistryClient.class); registry = Mockito.mock(SchemaRegistryClient.class);
} }
@Test
public void testMultipleUrls() throws Exception
{
String json = "{\"urls\":[\"http://localhost\"],\"type\": \"schema_registry\"}";
ObjectMapper mapper = new ObjectMapper();
SchemaRegistryBasedAvroBytesDecoder decoder;
decoder = (SchemaRegistryBasedAvroBytesDecoder) mapper
.readerFor(AvroBytesDecoder.class)
.readValue(json);
// Then
Assert.assertNotEquals(decoder.hashCode(), 0);
}
@Test
public void testUrl() throws Exception
{
String json = "{\"url\":\"http://localhost\",\"type\": \"schema_registry\"}";
ObjectMapper mapper = new ObjectMapper();
SchemaRegistryBasedAvroBytesDecoder decoder;
decoder = (SchemaRegistryBasedAvroBytesDecoder) mapper
.readerFor(AvroBytesDecoder.class)
.readValue(json);
// Then
Assert.assertNotEquals(decoder.hashCode(), 0);
}
@Test
public void testConfig() throws Exception
{
String json = "{\"url\":\"http://localhost\",\"type\": \"schema_registry\", \"config\":{}}";
ObjectMapper mapper = new ObjectMapper();
SchemaRegistryBasedAvroBytesDecoder decoder;
decoder = (SchemaRegistryBasedAvroBytesDecoder) mapper
.readerFor(AvroBytesDecoder.class)
.readValue(json);
// Then
Assert.assertNotEquals(decoder.hashCode(), 0);
}
@Test @Test
public void testParse() throws Exception public void testParse() throws Exception
{ {
// Given // Given
Mockito.when(registry.getByID(ArgumentMatchers.eq(1234))).thenReturn(SomeAvroDatum.getClassSchema()); Mockito.when(registry.getSchemaById(ArgumentMatchers.eq(1234))).thenReturn(new AvroSchema(SomeAvroDatum.getClassSchema()));
GenericRecord someAvroDatum = AvroStreamInputRowParserTest.buildSomeAvroDatum(); GenericRecord someAvroDatum = AvroStreamInputRowParserTest.buildSomeAvroDatum();
Schema schema = SomeAvroDatum.getClassSchema(); Schema schema = SomeAvroDatum.getClassSchema();
byte[] bytes = getAvroDatum(schema, someAvroDatum); byte[] bytes = getAvroDatum(schema, someAvroDatum);
@ -68,7 +112,7 @@ public class SchemaRegistryBasedAvroBytesDecoderTest
public void testParseCorrupted() throws Exception public void testParseCorrupted() throws Exception
{ {
// Given // Given
Mockito.when(registry.getByID(ArgumentMatchers.eq(1234))).thenReturn(SomeAvroDatum.getClassSchema()); Mockito.when(registry.getSchemaById(ArgumentMatchers.eq(1234))).thenReturn(new AvroSchema(SomeAvroDatum.getClassSchema()));
GenericRecord someAvroDatum = AvroStreamInputRowParserTest.buildSomeAvroDatum(); GenericRecord someAvroDatum = AvroStreamInputRowParserTest.buildSomeAvroDatum();
Schema schema = SomeAvroDatum.getClassSchema(); Schema schema = SomeAvroDatum.getClassSchema();
byte[] bytes = getAvroDatum(schema, someAvroDatum); byte[] bytes = getAvroDatum(schema, someAvroDatum);
@ -81,7 +125,7 @@ public class SchemaRegistryBasedAvroBytesDecoderTest
public void testParseWrongId() throws Exception public void testParseWrongId() throws Exception
{ {
// Given // Given
Mockito.when(registry.getByID(ArgumentMatchers.anyInt())).thenThrow(new IOException("no pasaran")); Mockito.when(registry.getSchemaById(ArgumentMatchers.anyInt())).thenThrow(new IOException("no pasaran"));
GenericRecord someAvroDatum = AvroStreamInputRowParserTest.buildSomeAvroDatum(); GenericRecord someAvroDatum = AvroStreamInputRowParserTest.buildSomeAvroDatum();
Schema schema = SomeAvroDatum.getClassSchema(); Schema schema = SomeAvroDatum.getClassSchema();
byte[] bytes = getAvroDatum(schema, someAvroDatum); byte[] bytes = getAvroDatum(schema, someAvroDatum);

View File

@ -3382,12 +3382,94 @@ notices:
--- ---
name: Kafka Schema Registry Client name: Kafka Schema Registry Client
version: 3.0.1 version: 5.5.1
license_category: binary license_category: binary
module: extensions/druid-avro-extensions module: extensions/druid-avro-extensions
license_name: Apache License version 2.0 license_name: Apache License version 2.0
libraries: libraries:
- io.confluent: kafka-schema-registry-client - io.confluent: kafka-schema-registry-client
- io.confluent: common-config
- io.confluent: common-utils
---
name: Kafka Client
version: 5.5.1-ccs
license_category: binary
module: extensions/druid-avro-extensions
license_name: Apache License version 2.0
libraries:
- org.apache.kafka: kafka-clients
---
name: swagger-annotations
version: 1.6.0
license_category: binary
module: extensions/druid-avro-extensions
license_name: Apache License version 2.0
libraries:
- io.swagger: swagger-annotations
---
name: jersey-common
version: '2.30'
license_category: binary
module: extensions/druid-avro-extensions
license_name: Apache License version 2.0
libraries:
- org.glassfish.jersey.core: jersey-common
---
name: osgi-resource-locator
version: 1.0.3
license_category: binary
module: extensions/druid-avro-extensions
license_name: Eclipse Public License 2.0
libraries:
- org.glassfish.hk2: osgi-resource-locator
---
name: jakarta.inject
version: 2.6.1
license_category: binary
module: extensions/druid-avro-extensions
license_name: Eclipse Public License 2.0
libraries:
- org.glassfish.hk2.external: jakarta.inject
---
name: jakarta.annotation
version: 1.3.5
license_category: binary
module: extensions/druid-avro-extensions
license_name: Eclipse Public License 2.0
libraries:
- jakarta.annotation: jakarta.annotation-api
---
name: javax.ws.rs-api
version: 2.1.1
license_category: binary
module: extensions/druid-avro-extensions
license_name: Eclipse Public License 2.0
libraries:
- javax.ws.rs: javax.ws.rs-api
---
name: jakarta.ws.rs-api
version: 2.1.6
license_category: binary
module: extensions/druid-avro-extensions
license_name: Eclipse Public License 2.0
libraries:
- jakarta.ws.rs: jakarta.ws.rs-api
--- ---

View File

@ -417,6 +417,7 @@ untrusted
useFilterCNF useFilterCNF
uptime uptime
uris uris
urls
useFieldDiscovery useFieldDiscovery
v1 v1
v2 v2
@ -938,6 +939,7 @@ ctrl
jsonLowercase jsonLowercase
listDelimiter listDelimiter
timestampSpec timestampSpec
urls
- ../docs/ingestion/data-management.md - ../docs/ingestion/data-management.md
1GB 1GB
IOConfig IOConfig