diff --git a/distribution/bin/check-licenses.py b/distribution/bin/check-licenses.py index 0d3f83b0255..d8e96769e0f 100755 --- a/distribution/bin/check-licenses.py +++ b/distribution/bin/check-licenses.py @@ -216,6 +216,7 @@ def build_compatible_license_names(): compatible_licenses['Apache 2.0'] = 'Apache License version 2.0' compatible_licenses['Apache-2.0'] = 'Apache License version 2.0' compatible_licenses['Apache 2'] = 'Apache License version 2.0' + compatible_licenses['Apache License 2'] = 'Apache License version 2.0' compatible_licenses['Apache License 2.0'] = 'Apache License version 2.0' compatible_licenses['Apache Software License - Version 2.0'] = 'Apache License version 2.0' compatible_licenses['The Apache License, Version 2.0'] = 'Apache License version 2.0' @@ -223,6 +224,7 @@ def build_compatible_license_names(): compatible_licenses['Apache License Version 2.0'] = 'Apache License version 2.0' compatible_licenses['Apache License Version 2'] = 'Apache License version 2.0' compatible_licenses['Apache License v2.0'] = 'Apache License version 2.0' + compatible_licenses['Apache License, 2.0'] = 'Apache License version 2.0' compatible_licenses['Apache License, version 2.0'] = 'Apache License version 2.0' compatible_licenses['Apache 2.0 License'] = 'Apache License version 2.0' @@ -260,6 +262,14 @@ def build_compatible_license_names(): compatible_licenses['Eclipse Public License - Version 1.0'] = 'Eclipse Public License 1.0' compatible_licenses['Eclipse Public License, Version 1.0'] = 'Eclipse Public License 1.0' compatible_licenses['Eclipse Public License v1.0'] = 'Eclipse Public License 1.0' + compatible_licenses['EPL 1.0'] = 'Eclipse Public License 1.0' + + compatible_licenses['Eclipse Public License 2.0'] = 'Eclipse Public License 2.0' + compatible_licenses['The Eclipse Public License, Version 2.0'] = 'Eclipse Public License 2.0' + compatible_licenses['Eclipse Public License - Version 2.0'] = 'Eclipse Public License 2.0' + compatible_licenses['Eclipse Public License, Version 2.0'] = 'Eclipse Public License 2.0' + compatible_licenses['Eclipse Public License v2.0'] = 'Eclipse Public License 2.0' + compatible_licenses['EPL 2.0'] = 'Eclipse Public License 2.0' compatible_licenses['Eclipse Distribution License 1.0'] = 'Eclipse Distribution License 1.0' compatible_licenses['Eclipse Distribution License - v 1.0'] = 'Eclipse Distribution License 1.0' diff --git a/docs/ingestion/data-formats.md b/docs/ingestion/data-formats.md index 2c667817d92..96f5d924b75 100644 --- a/docs/ingestion/data-formats.md +++ b/docs/ingestion/data-formats.md @@ -1016,7 +1016,13 @@ For details, see the Schema Registry [documentation](http://docs.confluent.io/cu | type | String | This should say `schema_registry`. | no | | url | String | Specifies the url endpoint of the Schema Registry. | yes | | capacity | Integer | Specifies the max size of the cache (default = Integer.MAX_VALUE). | no | +| urls | Array | Specifies the url endpoints of the multiple Schema Registry instances. | yes(if `url` is not provided) | +| config | Json | To send additional configurations, configured for Schema Registry | no | +| headers | Json | To send headers to the Schema Registry | no | +For a single schema registry instance, use Field `url` or `urls` for multi instances. + +Single Instance: ```json ... "avroBytesDecoder" : { @@ -1026,6 +1032,31 @@ For details, see the Schema Registry [documentation](http://docs.confluent.io/cu ... ``` +Multiple Instances: +```json +... +"avroBytesDecoder" : { + "type" : "schema_registry", + "urls" : [, , ...], + "config" : { + "basic.auth.credentials.source": "USER_INFO", + "basic.auth.user.info": "fred:letmein", + "schema.registry.ssl.truststore.location": "/some/secrets/kafka.client.truststore.jks", + "schema.registry.ssl.truststore.password": "", + "schema.registry.ssl.keystore.location": "/some/secrets/kafka.client.keystore.jks", + "schema.registry.ssl.keystore.password": "", + "schema.registry.ssl.key.password": "" + ... + }, + "headers": { + "traceID" : "b29c5de2-0db4-490b-b421", + "timeStamp" : "1577191871865", + ... + } +} +... +``` + ### Protobuf Parser > You need to include the [`druid-protobuf-extensions`](../development/extensions-core/protobuf.md) as an extension to use the Protobuf Parser. diff --git a/extensions-core/avro-extensions/pom.xml b/extensions-core/avro-extensions/pom.xml index 6b46a15ec7a..b07c4f4252c 100644 --- a/extensions-core/avro-extensions/pom.xml +++ b/extensions-core/avro-extensions/pom.xml @@ -35,7 +35,7 @@ 0.1.3 - 3.0.1 + 5.5.1 @@ -169,6 +169,22 @@ com.fasterxml.jackson.core jackson-databind + + javax.ws.rs + javax.ws.rs-api + + + javax.ws.rs + javax.ws.rs-api + + + javax.ws.rs + jsr311-api + + + jakarta.ws.rs + jakarta.ws.rs-api + diff --git a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoder.java b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoder.java index 6ff97c41857..42765fa6778 100644 --- a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoder.java +++ b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoder.java @@ -22,6 +22,8 @@ package org.apache.druid.data.input.avro; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.annotations.VisibleForTesting; +import io.confluent.kafka.schemaregistry.ParsedSchema; +import io.confluent.kafka.schemaregistry.avro.AvroSchema; import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient; import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; import org.apache.avro.Schema; @@ -31,7 +33,10 @@ import org.apache.avro.io.DatumReader; import org.apache.avro.io.DecoderFactory; import org.apache.druid.java.util.common.parsers.ParseException; +import javax.annotation.Nullable; import java.nio.ByteBuffer; +import java.util.List; +import java.util.Map; import java.util.Objects; public class SchemaRegistryBasedAvroBytesDecoder implements AvroBytesDecoder @@ -40,12 +45,19 @@ public class SchemaRegistryBasedAvroBytesDecoder implements AvroBytesDecoder @JsonCreator public SchemaRegistryBasedAvroBytesDecoder( - @JsonProperty("url") String url, - @JsonProperty("capacity") Integer capacity + @JsonProperty("url") @Deprecated String url, + @JsonProperty("capacity") Integer capacity, + @JsonProperty("urls") @Nullable List urls, + @JsonProperty("config") @Nullable Map config, + @JsonProperty("headers") @Nullable Map headers ) { int identityMapCapacity = capacity == null ? Integer.MAX_VALUE : capacity; - this.registry = new CachedSchemaRegistryClient(url, identityMapCapacity); + if (url != null && !url.isEmpty()) { + this.registry = new CachedSchemaRegistryClient(url, identityMapCapacity, config, headers); + } else { + this.registry = new CachedSchemaRegistryClient(urls, identityMapCapacity, config, headers); + } } //For UT only @@ -63,7 +75,8 @@ public class SchemaRegistryBasedAvroBytesDecoder implements AvroBytesDecoder int id = bytes.getInt(); // extract schema registry id int length = bytes.limit() - 1 - 4; int offset = bytes.position() + bytes.arrayOffset(); - Schema schema = registry.getByID(id); + ParsedSchema parsedSchema = registry.getSchemaById(id); + Schema schema = parsedSchema instanceof AvroSchema ? ((AvroSchema) parsedSchema).rawSchema() : null; DatumReader reader = new GenericDatumReader<>(schema); return reader.read(null, DecoderFactory.get().binaryDecoder(bytes.array(), offset, length, null)); } diff --git a/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoderTest.java b/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoderTest.java index f5f1776a36f..55c7e6bd452 100644 --- a/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoderTest.java +++ b/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoderTest.java @@ -19,6 +19,8 @@ package org.apache.druid.data.input.avro; +import com.fasterxml.jackson.databind.ObjectMapper; +import io.confluent.kafka.schemaregistry.avro.AvroSchema; import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; @@ -48,11 +50,53 @@ public class SchemaRegistryBasedAvroBytesDecoderTest registry = Mockito.mock(SchemaRegistryClient.class); } + @Test + public void testMultipleUrls() throws Exception + { + String json = "{\"urls\":[\"http://localhost\"],\"type\": \"schema_registry\"}"; + ObjectMapper mapper = new ObjectMapper(); + SchemaRegistryBasedAvroBytesDecoder decoder; + decoder = (SchemaRegistryBasedAvroBytesDecoder) mapper + .readerFor(AvroBytesDecoder.class) + .readValue(json); + + // Then + Assert.assertNotEquals(decoder.hashCode(), 0); + } + + @Test + public void testUrl() throws Exception + { + String json = "{\"url\":\"http://localhost\",\"type\": \"schema_registry\"}"; + ObjectMapper mapper = new ObjectMapper(); + SchemaRegistryBasedAvroBytesDecoder decoder; + decoder = (SchemaRegistryBasedAvroBytesDecoder) mapper + .readerFor(AvroBytesDecoder.class) + .readValue(json); + + // Then + Assert.assertNotEquals(decoder.hashCode(), 0); + } + + @Test + public void testConfig() throws Exception + { + String json = "{\"url\":\"http://localhost\",\"type\": \"schema_registry\", \"config\":{}}"; + ObjectMapper mapper = new ObjectMapper(); + SchemaRegistryBasedAvroBytesDecoder decoder; + decoder = (SchemaRegistryBasedAvroBytesDecoder) mapper + .readerFor(AvroBytesDecoder.class) + .readValue(json); + + // Then + Assert.assertNotEquals(decoder.hashCode(), 0); + } + @Test public void testParse() throws Exception { // Given - Mockito.when(registry.getByID(ArgumentMatchers.eq(1234))).thenReturn(SomeAvroDatum.getClassSchema()); + Mockito.when(registry.getSchemaById(ArgumentMatchers.eq(1234))).thenReturn(new AvroSchema(SomeAvroDatum.getClassSchema())); GenericRecord someAvroDatum = AvroStreamInputRowParserTest.buildSomeAvroDatum(); Schema schema = SomeAvroDatum.getClassSchema(); byte[] bytes = getAvroDatum(schema, someAvroDatum); @@ -68,7 +112,7 @@ public class SchemaRegistryBasedAvroBytesDecoderTest public void testParseCorrupted() throws Exception { // Given - Mockito.when(registry.getByID(ArgumentMatchers.eq(1234))).thenReturn(SomeAvroDatum.getClassSchema()); + Mockito.when(registry.getSchemaById(ArgumentMatchers.eq(1234))).thenReturn(new AvroSchema(SomeAvroDatum.getClassSchema())); GenericRecord someAvroDatum = AvroStreamInputRowParserTest.buildSomeAvroDatum(); Schema schema = SomeAvroDatum.getClassSchema(); byte[] bytes = getAvroDatum(schema, someAvroDatum); @@ -81,7 +125,7 @@ public class SchemaRegistryBasedAvroBytesDecoderTest public void testParseWrongId() throws Exception { // Given - Mockito.when(registry.getByID(ArgumentMatchers.anyInt())).thenThrow(new IOException("no pasaran")); + Mockito.when(registry.getSchemaById(ArgumentMatchers.anyInt())).thenThrow(new IOException("no pasaran")); GenericRecord someAvroDatum = AvroStreamInputRowParserTest.buildSomeAvroDatum(); Schema schema = SomeAvroDatum.getClassSchema(); byte[] bytes = getAvroDatum(schema, someAvroDatum); diff --git a/licenses.yaml b/licenses.yaml index ef235ad3f90..73b6f962490 100644 --- a/licenses.yaml +++ b/licenses.yaml @@ -13,7 +13,7 @@ # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations -# under the License. +# under the License. name: conjunctive normal form conversion code, a variance aggregator algorithm, and Bloom filter adapted from Apache Hive version: @@ -3382,15 +3382,97 @@ notices: --- name: Kafka Schema Registry Client -version: 3.0.1 +version: 5.5.1 license_category: binary module: extensions/druid-avro-extensions license_name: Apache License version 2.0 libraries: - io.confluent: kafka-schema-registry-client + - io.confluent: common-config + - io.confluent: common-utils --- +name: Kafka Client +version: 5.5.1-ccs +license_category: binary +module: extensions/druid-avro-extensions +license_name: Apache License version 2.0 +libraries: + - org.apache.kafka: kafka-clients + +--- + +name: swagger-annotations +version: 1.6.0 +license_category: binary +module: extensions/druid-avro-extensions +license_name: Apache License version 2.0 +libraries: + - io.swagger: swagger-annotations + +--- + +name: jersey-common +version: '2.30' +license_category: binary +module: extensions/druid-avro-extensions +license_name: Apache License version 2.0 +libraries: + - org.glassfish.jersey.core: jersey-common + +--- + +name: osgi-resource-locator +version: 1.0.3 +license_category: binary +module: extensions/druid-avro-extensions +license_name: Eclipse Public License 2.0 +libraries: + - org.glassfish.hk2: osgi-resource-locator + +--- + +name: jakarta.inject +version: 2.6.1 +license_category: binary +module: extensions/druid-avro-extensions +license_name: Eclipse Public License 2.0 +libraries: + - org.glassfish.hk2.external: jakarta.inject + +--- + +name: jakarta.annotation +version: 1.3.5 +license_category: binary +module: extensions/druid-avro-extensions +license_name: Eclipse Public License 2.0 +libraries: + - jakarta.annotation: jakarta.annotation-api + +--- + +name: javax.ws.rs-api +version: 2.1.1 +license_category: binary +module: extensions/druid-avro-extensions +license_name: Eclipse Public License 2.0 +libraries: + - javax.ws.rs: javax.ws.rs-api + +--- + +name: jakarta.ws.rs-api +version: 2.1.6 +license_category: binary +module: extensions/druid-avro-extensions +license_name: Eclipse Public License 2.0 +libraries: + - jakarta.ws.rs: jakarta.ws.rs-api + +--- + name: Apache Velocity Engine version: 2.2 license_category: binary diff --git a/website/.spelling b/website/.spelling index e4cdaac495c..7b6e38aaa5c 100644 --- a/website/.spelling +++ b/website/.spelling @@ -417,6 +417,7 @@ untrusted useFilterCNF uptime uris +urls useFieldDiscovery v1 v2 @@ -938,6 +939,7 @@ ctrl jsonLowercase listDelimiter timestampSpec +urls - ../docs/ingestion/data-management.md 1GB IOConfig