From aa7cb50f24a7e1a2c2506310fe3468c60f16766c Mon Sep 17 00:00:00 2001 From: Yi Yuan <269081523@qq.com> Date: Wed, 4 Aug 2021 04:24:52 +0800 Subject: [PATCH] Add DynamicConfigProvider for Schema Registry (#11362) * add_DynamicConfigProvider_for_schema_registry * bug fixed * add document * fix document * fix spot bug * fix document * inject ObjectMapper * add DynamicConfigProviderUtils * add UT * bug fixed Co-authored-by: yuanyi --- .../utils/DynamicConfigProviderUtils.java | 73 ++++++++++++++++ .../utils/DynamicConfigProviderUtilsTest.java | 84 +++++++++++++++++++ docs/ingestion/data-formats.md | 25 ++++-- .../SchemaRegistryBasedAvroBytesDecoder.java | 25 ++++-- .../data/input/AvroStreamInputFormatTest.java | 6 +- ...hemaRegistryBasedAvroBytesDecoderTest.java | 64 +++++++++++++- ...hemaRegistryBasedProtobufBytesDecoder.java | 25 ++++-- .../protobuf/ProtobufInputFormatTest.java | 6 +- ...RegistryBasedProtobufBytesDecoderTest.java | 68 +++++++++++++-- 9 files changed, 345 insertions(+), 31 deletions(-) create mode 100644 core/src/main/java/org/apache/druid/utils/DynamicConfigProviderUtils.java create mode 100644 core/src/test/java/org/apache/druid/utils/DynamicConfigProviderUtilsTest.java diff --git a/core/src/main/java/org/apache/druid/utils/DynamicConfigProviderUtils.java b/core/src/main/java/org/apache/druid/utils/DynamicConfigProviderUtils.java new file mode 100644 index 00000000000..4c45262aba0 --- /dev/null +++ b/core/src/main/java/org/apache/druid/utils/DynamicConfigProviderUtils.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.utils; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.druid.metadata.DynamicConfigProvider; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +public class DynamicConfigProviderUtils +{ + public static Map extraConfigAndSetStringMap(Map config, String dynamicConfigProviderKey, ObjectMapper mapper) + { + HashMap newConfig = new HashMap<>(); + if (config != null) { + for (Map.Entry entry : config.entrySet()) { + if (!dynamicConfigProviderKey.equals(entry.getKey())) { + newConfig.put(entry.getKey(), entry.getValue().toString()); + } + } + Map dynamicConfig = extraConfigFromProvider(config.get(dynamicConfigProviderKey), mapper); + for (Map.Entry entry : dynamicConfig.entrySet()) { + newConfig.put(entry.getKey(), entry.getValue()); + } + } + return newConfig; + } + + public static Map extraConfigAndSetObjectMap(Map config, String dynamicConfigProviderKey, ObjectMapper mapper) + { + HashMap newConfig = new HashMap<>(); + if (config != null) { + for (Map.Entry entry : config.entrySet()) { + if (!dynamicConfigProviderKey.equals(entry.getKey())) { + newConfig.put(entry.getKey(), entry.getValue()); + } + } + Map dynamicConfig = extraConfigFromProvider(config.get(dynamicConfigProviderKey), mapper); + for (Map.Entry entry : dynamicConfig.entrySet()) { + newConfig.put(entry.getKey(), entry.getValue()); + } + } + return newConfig; + } + + private static Map extraConfigFromProvider(Object dynamicConfigProviderJson, ObjectMapper mapper) + { + if (dynamicConfigProviderJson != null) { + DynamicConfigProvider dynamicConfigProvider = mapper.convertValue(dynamicConfigProviderJson, DynamicConfigProvider.class); + return dynamicConfigProvider.getConfig(); + } + return Collections.emptyMap(); + } +} diff --git a/core/src/test/java/org/apache/druid/utils/DynamicConfigProviderUtilsTest.java b/core/src/test/java/org/apache/druid/utils/DynamicConfigProviderUtilsTest.java new file mode 100644 index 00000000000..496acfabef3 --- /dev/null +++ b/core/src/test/java/org/apache/druid/utils/DynamicConfigProviderUtilsTest.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.utils; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableMap; +import org.apache.druid.metadata.DynamicConfigProvider; +import org.apache.druid.metadata.MapStringDynamicConfigProvider; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.runners.Enclosed; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; + +import java.util.Map; + +@RunWith(Enclosed.class) +public class DynamicConfigProviderUtilsTest +{ + public static class ThrowIfURLHasNotAllowedPropertiesTest + { + @Rule + public ExpectedException expectedException = ExpectedException.none(); + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + private final String DYNAMIC_CONFIG_PROVIDER = "druid.dynamic.config.provider"; + + @Test + public void testExtraConfigAndSetStringMap() + { + DynamicConfigProvider dynamicConfigProvider = new MapStringDynamicConfigProvider( + ImmutableMap.of("prop2", "value2") + ); + + Map properties = ImmutableMap.of( + "prop1", "value1", + "prop2", "value3", + DYNAMIC_CONFIG_PROVIDER, OBJECT_MAPPER.convertValue(dynamicConfigProvider, Map.class) + ); + Map res = DynamicConfigProviderUtils.extraConfigAndSetStringMap(properties, DYNAMIC_CONFIG_PROVIDER, OBJECT_MAPPER); + + Assert.assertEquals(2, res.size()); + Assert.assertEquals("value1", res.get("prop1")); + Assert.assertEquals("value2", res.get("prop2")); + } + + @Test + public void testExtraConfigAndSetObjectMap() + { + DynamicConfigProvider dynamicConfigProvider = new MapStringDynamicConfigProvider( + ImmutableMap.of("prop2", "value2") + ); + + Map properties = ImmutableMap.of( + "prop1", "value1", + "prop2", "value3", + DYNAMIC_CONFIG_PROVIDER, OBJECT_MAPPER.convertValue(dynamicConfigProvider, Map.class) + ); + Map res = DynamicConfigProviderUtils.extraConfigAndSetObjectMap(properties, DYNAMIC_CONFIG_PROVIDER, OBJECT_MAPPER); + + Assert.assertEquals(2, res.size()); + Assert.assertEquals("value1", res.get("prop1").toString()); + Assert.assertEquals("value2", res.get("prop2").toString()); + } + } +} diff --git a/docs/ingestion/data-formats.md b/docs/ingestion/data-formats.md index 002d6d24b7e..cb1b3d29f34 100644 --- a/docs/ingestion/data-formats.md +++ b/docs/ingestion/data-formats.md @@ -380,8 +380,8 @@ For details, see the Schema Registry [documentation](http://docs.confluent.io/cu | url | String | Specifies the url endpoint of the Schema Registry. | yes | | capacity | Integer | Specifies the max size of the cache (default = Integer.MAX_VALUE). | no | | urls | Array | Specifies the url endpoints of the multiple Schema Registry instances. | yes(if `url` is not provided) | -| config | Json | To send additional configurations, configured for Schema Registry | no | -| headers | Json | To send headers to the Schema Registry | no | +| config | Json | To send additional configurations, configured for Schema Registry. This can be supplied via a [DynamicConfigProvider](../operations/dynamic-config-provider.md) | no | +| headers | Json | To send headers to the Schema Registry. This can be supplied via a [DynamicConfigProvider](../operations/dynamic-config-provider.md) | no | For a single schema registry instance, use Field `url` or `urls` for multi instances. @@ -408,12 +408,20 @@ Multiple Instances: "schema.registry.ssl.truststore.password": "", "schema.registry.ssl.keystore.location": "/some/secrets/kafka.client.keystore.jks", "schema.registry.ssl.keystore.password": "", - "schema.registry.ssl.key.password": "" + "schema.registry.ssl.key.password": "", + "schema.registry.ssl.key.password", ... }, "headers": { "traceID" : "b29c5de2-0db4-490b-b421", "timeStamp" : "1577191871865", + "druid.dynamic.config.provider":{ + "type":"mapString", + "config":{ + "registry.header.prop.1":"value.1", + "registry.header.prop.2":"value.2" + } + } ... } } @@ -1223,8 +1231,8 @@ For details, see the Schema Registry [documentation](http://docs.confluent.io/cu | url | String | Specifies the url endpoint of the Schema Registry. | yes | | capacity | Integer | Specifies the max size of the cache (default = Integer.MAX_VALUE). | no | | urls | Array | Specifies the url endpoints of the multiple Schema Registry instances. | yes(if `url` is not provided) | -| config | Json | To send additional configurations, configured for Schema Registry | no | -| headers | Json | To send headers to the Schema Registry | no | +| config | Json | To send additional configurations, configured for Schema Registry. This can be supplied via a [DynamicConfigProvider](../operations/dynamic-config-provider.md). | no | +| headers | Json | To send headers to the Schema Registry. This can be supplied via a [DynamicConfigProvider](../operations/dynamic-config-provider.md) | no | For a single schema registry instance, use Field `url` or `urls` for multi instances. @@ -1259,6 +1267,13 @@ Multiple Instances: "headers": { "traceID" : "b29c5de2-0db4-490b-b421", "timeStamp" : "1577191871865", + "druid.dynamic.config.provider":{ + "type":"mapString", + "config":{ + "registry.header.prop.1":"value.1", + "registry.header.prop.2":"value.2" + } + } ... } } diff --git a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoder.java b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoder.java index 4b3da38c49c..59cb33e35f4 100644 --- a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoder.java +++ b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoder.java @@ -19,8 +19,10 @@ package org.apache.druid.data.input.avro; +import com.fasterxml.jackson.annotation.JacksonInject; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.annotations.VisibleForTesting; import io.confluent.kafka.schemaregistry.ParsedSchema; import io.confluent.kafka.schemaregistry.avro.AvroSchema; @@ -32,8 +34,10 @@ import org.apache.avro.generic.GenericDatumReader; import org.apache.avro.generic.GenericRecord; import org.apache.avro.io.DatumReader; import org.apache.avro.io.DecoderFactory; +import org.apache.druid.guice.annotations.Json; import org.apache.druid.java.util.common.RE; import org.apache.druid.java.util.common.parsers.ParseException; +import org.apache.druid.utils.DynamicConfigProviderUtils; import javax.annotation.Nullable; import java.io.IOException; @@ -48,16 +52,19 @@ public class SchemaRegistryBasedAvroBytesDecoder implements AvroBytesDecoder private final String url; private final int capacity; private final List urls; - private final Map config; - private final Map headers; + private final Map config; + private final Map headers; + private final ObjectMapper jsonMapper; + public static final String DRUID_DYNAMIC_CONFIG_PROVIDER_KEY = "druid.dynamic.config.provider"; @JsonCreator public SchemaRegistryBasedAvroBytesDecoder( @JsonProperty("url") @Deprecated String url, @JsonProperty("capacity") Integer capacity, @JsonProperty("urls") @Nullable List urls, - @JsonProperty("config") @Nullable Map config, - @JsonProperty("headers") @Nullable Map headers + @JsonProperty("config") @Nullable Map config, + @JsonProperty("headers") @Nullable Map headers, + @JacksonInject @Json ObjectMapper jsonMapper ) { this.url = url; @@ -65,10 +72,11 @@ public class SchemaRegistryBasedAvroBytesDecoder implements AvroBytesDecoder this.urls = urls; this.config = config; this.headers = headers; + this.jsonMapper = jsonMapper; if (url != null && !url.isEmpty()) { - this.registry = new CachedSchemaRegistryClient(this.url, this.capacity, this.config, this.headers); + this.registry = new CachedSchemaRegistryClient(this.url, this.capacity, DynamicConfigProviderUtils.extraConfigAndSetObjectMap(config, DRUID_DYNAMIC_CONFIG_PROVIDER_KEY, this.jsonMapper), DynamicConfigProviderUtils.extraConfigAndSetStringMap(headers, DRUID_DYNAMIC_CONFIG_PROVIDER_KEY, this.jsonMapper)); } else { - this.registry = new CachedSchemaRegistryClient(this.urls, this.capacity, this.config, this.headers); + this.registry = new CachedSchemaRegistryClient(this.urls, this.capacity, DynamicConfigProviderUtils.extraConfigAndSetObjectMap(config, DRUID_DYNAMIC_CONFIG_PROVIDER_KEY, this.jsonMapper), DynamicConfigProviderUtils.extraConfigAndSetStringMap(headers, DRUID_DYNAMIC_CONFIG_PROVIDER_KEY, this.jsonMapper)); } } @@ -91,13 +99,13 @@ public class SchemaRegistryBasedAvroBytesDecoder implements AvroBytesDecoder } @JsonProperty - public Map getConfig() + public Map getConfig() { return config; } @JsonProperty - public Map getHeaders() + public Map getHeaders() { return headers; } @@ -112,6 +120,7 @@ public class SchemaRegistryBasedAvroBytesDecoder implements AvroBytesDecoder this.config = null; this.headers = null; this.registry = registry; + this.jsonMapper = new ObjectMapper(); } @Override diff --git a/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/AvroStreamInputFormatTest.java b/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/AvroStreamInputFormatTest.java index 4213008ba2b..4a0c2df1cac 100644 --- a/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/AvroStreamInputFormatTest.java +++ b/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/AvroStreamInputFormatTest.java @@ -19,6 +19,7 @@ package org.apache.druid.data.input; +import com.fasterxml.jackson.databind.InjectableValues; import com.fasterxml.jackson.databind.Module; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableList; @@ -108,6 +109,9 @@ public class AvroStreamInputFormatTest for (Module jacksonModule : new AvroExtensionsModule().getJacksonModules()) { jsonMapper.registerModule(jacksonModule); } + jsonMapper.setInjectableValues( + new InjectableValues.Std().addValue(ObjectMapper.class, new DefaultObjectMapper()) + ); } @Test @@ -151,7 +155,7 @@ public class AvroStreamInputFormatTest { AvroStreamInputFormat inputFormat = new AvroStreamInputFormat( flattenSpec, - new SchemaRegistryBasedAvroBytesDecoder("http://test:8081", 100, null, null, null), + new SchemaRegistryBasedAvroBytesDecoder("http://test:8081", 100, null, null, null, null), false, false ); diff --git a/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoderTest.java b/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoderTest.java index 3eb643934fe..ec073c92750 100644 --- a/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoderTest.java +++ b/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoderTest.java @@ -19,6 +19,8 @@ package org.apache.druid.data.input.avro; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.InjectableValues; import com.fasterxml.jackson.databind.ObjectMapper; import io.confluent.kafka.schemaregistry.ParsedSchema; import io.confluent.kafka.schemaregistry.avro.AvroSchema; @@ -30,8 +32,10 @@ import org.apache.avro.io.EncoderFactory; import org.apache.avro.specific.SpecificDatumWriter; import org.apache.druid.data.input.AvroStreamInputRowParserTest; import org.apache.druid.data.input.SomeAvroDatum; +import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.java.util.common.RE; import org.apache.druid.java.util.common.parsers.ParseException; +import org.apache.druid.utils.DynamicConfigProviderUtils; import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -41,6 +45,7 @@ import org.mockito.Mockito; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.Map; public class SchemaRegistryBasedAvroBytesDecoderTest { @@ -56,7 +61,10 @@ public class SchemaRegistryBasedAvroBytesDecoderTest public void testMultipleUrls() throws Exception { String json = "{\"urls\":[\"http://localhost\"],\"type\": \"schema_registry\"}"; - ObjectMapper mapper = new ObjectMapper(); + ObjectMapper mapper = new DefaultObjectMapper(); + mapper.setInjectableValues( + new InjectableValues.Std().addValue(ObjectMapper.class, new DefaultObjectMapper()) + ); SchemaRegistryBasedAvroBytesDecoder decoder; decoder = (SchemaRegistryBasedAvroBytesDecoder) mapper .readerFor(AvroBytesDecoder.class) @@ -70,7 +78,10 @@ public class SchemaRegistryBasedAvroBytesDecoderTest public void testUrl() throws Exception { String json = "{\"url\":\"http://localhost\",\"type\": \"schema_registry\"}"; - ObjectMapper mapper = new ObjectMapper(); + ObjectMapper mapper = new DefaultObjectMapper(); + mapper.setInjectableValues( + new InjectableValues.Std().addValue(ObjectMapper.class, new DefaultObjectMapper()) + ); SchemaRegistryBasedAvroBytesDecoder decoder; decoder = (SchemaRegistryBasedAvroBytesDecoder) mapper .readerFor(AvroBytesDecoder.class) @@ -84,7 +95,10 @@ public class SchemaRegistryBasedAvroBytesDecoderTest public void testConfig() throws Exception { String json = "{\"url\":\"http://localhost\",\"type\": \"schema_registry\", \"config\":{}}"; - ObjectMapper mapper = new ObjectMapper(); + ObjectMapper mapper = new DefaultObjectMapper(); + mapper.setInjectableValues( + new InjectableValues.Std().addValue(ObjectMapper.class, new DefaultObjectMapper()) + ); SchemaRegistryBasedAvroBytesDecoder decoder; decoder = (SchemaRegistryBasedAvroBytesDecoder) mapper .readerFor(AvroBytesDecoder.class) @@ -163,4 +177,48 @@ public class SchemaRegistryBasedAvroBytesDecoderTest writer.write(someAvroDatum, EncoderFactory.get().directBinaryEncoder(out, null)); return out.toByteArray(); } + + @Test + public void testParseHeader() throws JsonProcessingException + { + String json = "{\"url\":\"http://localhost\",\"type\":\"schema_registry\",\"config\":{},\"headers\":{\"druid.dynamic.config.provider\":{\"type\":\"mapString\", \"config\":{\"registry.header.prop.2\":\"value.2\", \"registry.header.prop.3\":\"value.3\"}},\"registry.header.prop.1\":\"value.1\",\"registry.header.prop.2\":\"value.4\"}}"; + ObjectMapper mapper = new DefaultObjectMapper(); + mapper.setInjectableValues( + new InjectableValues.Std().addValue(ObjectMapper.class, new DefaultObjectMapper()) + ); + SchemaRegistryBasedAvroBytesDecoder decoder; + decoder = (SchemaRegistryBasedAvroBytesDecoder) mapper + .readerFor(AvroBytesDecoder.class) + .readValue(json); + + Map header = DynamicConfigProviderUtils.extraConfigAndSetStringMap(decoder.getHeaders(), SchemaRegistryBasedAvroBytesDecoder.DRUID_DYNAMIC_CONFIG_PROVIDER_KEY, new DefaultObjectMapper()); + + // Then + Assert.assertEquals(3, header.size()); + Assert.assertEquals("value.1", header.get("registry.header.prop.1")); + Assert.assertEquals("value.2", header.get("registry.header.prop.2")); + Assert.assertEquals("value.3", header.get("registry.header.prop.3")); + } + + @Test + public void testParseConfig() throws JsonProcessingException + { + String json = "{\"url\":\"http://localhost\",\"type\":\"schema_registry\",\"config\":{\"druid.dynamic.config.provider\":{\"type\":\"mapString\", \"config\":{\"registry.config.prop.2\":\"value.2\", \"registry.config.prop.3\":\"value.3\"}},\"registry.config.prop.1\":\"value.1\",\"registry.config.prop.2\":\"value.4\"},\"headers\":{}}"; + ObjectMapper mapper = new DefaultObjectMapper(); + mapper.setInjectableValues( + new InjectableValues.Std().addValue(ObjectMapper.class, new DefaultObjectMapper()) + ); + SchemaRegistryBasedAvroBytesDecoder decoder; + decoder = (SchemaRegistryBasedAvroBytesDecoder) mapper + .readerFor(AvroBytesDecoder.class) + .readValue(json); + + Map config = DynamicConfigProviderUtils.extraConfigAndSetStringMap(decoder.getConfig(), SchemaRegistryBasedAvroBytesDecoder.DRUID_DYNAMIC_CONFIG_PROVIDER_KEY, new DefaultObjectMapper()); + + // Then + Assert.assertEquals(3, config.size()); + Assert.assertEquals("value.1", config.get("registry.config.prop.1")); + Assert.assertEquals("value.2", config.get("registry.config.prop.2")); + Assert.assertEquals("value.3", config.get("registry.config.prop.3")); + } } diff --git a/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/SchemaRegistryBasedProtobufBytesDecoder.java b/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/SchemaRegistryBasedProtobufBytesDecoder.java index 2d4cc8dd555..17bb85a59ed 100644 --- a/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/SchemaRegistryBasedProtobufBytesDecoder.java +++ b/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/SchemaRegistryBasedProtobufBytesDecoder.java @@ -19,8 +19,10 @@ package org.apache.druid.data.input.protobuf; +import com.fasterxml.jackson.annotation.JacksonInject; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.annotations.VisibleForTesting; import com.google.protobuf.Descriptors; import com.google.protobuf.DynamicMessage; @@ -29,8 +31,10 @@ import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException; import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema; import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchemaProvider; +import org.apache.druid.guice.annotations.Json; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.java.util.common.parsers.ParseException; +import org.apache.druid.utils.DynamicConfigProviderUtils; import javax.annotation.Nullable; @@ -50,16 +54,19 @@ public class SchemaRegistryBasedProtobufBytesDecoder implements ProtobufBytesDec private final String url; private final int capacity; private final List urls; - private final Map config; - private final Map headers; + private final Map config; + private final Map headers; + private final ObjectMapper jsonMapper; + public static final String DRUID_DYNAMIC_CONFIG_PROVIDER_KEY = "druid.dynamic.config.provider"; @JsonCreator public SchemaRegistryBasedProtobufBytesDecoder( @JsonProperty("url") @Deprecated String url, @JsonProperty("capacity") Integer capacity, @JsonProperty("urls") @Nullable List urls, - @JsonProperty("config") @Nullable Map config, - @JsonProperty("headers") @Nullable Map headers + @JsonProperty("config") @Nullable Map config, + @JsonProperty("headers") @Nullable Map headers, + @JacksonInject @Json ObjectMapper jsonMapper ) { this.url = url; @@ -67,10 +74,11 @@ public class SchemaRegistryBasedProtobufBytesDecoder implements ProtobufBytesDec this.urls = urls; this.config = config; this.headers = headers; + this.jsonMapper = jsonMapper; if (url != null && !url.isEmpty()) { - this.registry = new CachedSchemaRegistryClient(Collections.singletonList(this.url), this.capacity, Collections.singletonList(new ProtobufSchemaProvider()), this.config, this.headers); + this.registry = new CachedSchemaRegistryClient(Collections.singletonList(this.url), this.capacity, Collections.singletonList(new ProtobufSchemaProvider()), DynamicConfigProviderUtils.extraConfigAndSetObjectMap(config, DRUID_DYNAMIC_CONFIG_PROVIDER_KEY, this.jsonMapper), DynamicConfigProviderUtils.extraConfigAndSetStringMap(headers, DRUID_DYNAMIC_CONFIG_PROVIDER_KEY, this.jsonMapper)); } else { - this.registry = new CachedSchemaRegistryClient(this.urls, this.capacity, Collections.singletonList(new ProtobufSchemaProvider()), this.config, this.headers); + this.registry = new CachedSchemaRegistryClient(this.urls, this.capacity, Collections.singletonList(new ProtobufSchemaProvider()), DynamicConfigProviderUtils.extraConfigAndSetObjectMap(config, DRUID_DYNAMIC_CONFIG_PROVIDER_KEY, this.jsonMapper), DynamicConfigProviderUtils.extraConfigAndSetStringMap(headers, DRUID_DYNAMIC_CONFIG_PROVIDER_KEY, this.jsonMapper)); } } @@ -93,13 +101,13 @@ public class SchemaRegistryBasedProtobufBytesDecoder implements ProtobufBytesDec } @JsonProperty - public Map getConfig() + public Map getConfig() { return config; } @JsonProperty - public Map getHeaders() + public Map getHeaders() { return headers; } @@ -119,6 +127,7 @@ public class SchemaRegistryBasedProtobufBytesDecoder implements ProtobufBytesDec this.config = null; this.headers = null; this.registry = registry; + this.jsonMapper = new ObjectMapper(); } @Override diff --git a/extensions-core/protobuf-extensions/src/test/java/org/apache/druid/data/input/protobuf/ProtobufInputFormatTest.java b/extensions-core/protobuf-extensions/src/test/java/org/apache/druid/data/input/protobuf/ProtobufInputFormatTest.java index e9e15ffe260..0a3e8c7f7d5 100644 --- a/extensions-core/protobuf-extensions/src/test/java/org/apache/druid/data/input/protobuf/ProtobufInputFormatTest.java +++ b/extensions-core/protobuf-extensions/src/test/java/org/apache/druid/data/input/protobuf/ProtobufInputFormatTest.java @@ -19,6 +19,7 @@ package org.apache.druid.data.input.protobuf; +import com.fasterxml.jackson.databind.InjectableValues; import com.fasterxml.jackson.databind.Module; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.Lists; @@ -77,6 +78,9 @@ public class ProtobufInputFormatTest for (Module jacksonModule : new ProtobufExtensionsModule().getJacksonModules()) { jsonMapper.registerModule(jacksonModule); } + jsonMapper.setInjectableValues( + new InjectableValues.Std().addValue(ObjectMapper.class, new DefaultObjectMapper()) + ); } @Test @@ -99,7 +103,7 @@ public class ProtobufInputFormatTest { ProtobufInputFormat inputFormat = new ProtobufInputFormat( flattenSpec, - new SchemaRegistryBasedProtobufBytesDecoder("http://test:8081", 100, null, null, null) + new SchemaRegistryBasedProtobufBytesDecoder("http://test:8081", 100, null, null, null, null) ); NestedInputFormat inputFormat2 = jsonMapper.readValue( jsonMapper.writeValueAsString(inputFormat), diff --git a/extensions-core/protobuf-extensions/src/test/java/org/apache/druid/data/input/protobuf/SchemaRegistryBasedProtobufBytesDecoderTest.java b/extensions-core/protobuf-extensions/src/test/java/org/apache/druid/data/input/protobuf/SchemaRegistryBasedProtobufBytesDecoderTest.java index 0d77b11ec4a..009b5a60b07 100644 --- a/extensions-core/protobuf-extensions/src/test/java/org/apache/druid/data/input/protobuf/SchemaRegistryBasedProtobufBytesDecoderTest.java +++ b/extensions-core/protobuf-extensions/src/test/java/org/apache/druid/data/input/protobuf/SchemaRegistryBasedProtobufBytesDecoderTest.java @@ -19,6 +19,8 @@ package org.apache.druid.data.input.protobuf; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.InjectableValues; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableMap; import com.google.protobuf.DynamicMessage; @@ -26,7 +28,9 @@ import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient; import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema; import org.apache.commons.io.IOUtils; +import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.java.util.common.parsers.ParseException; +import org.apache.druid.utils.DynamicConfigProviderUtils; import org.joda.time.DateTime; import org.joda.time.chrono.ISOChronology; import org.junit.Assert; @@ -40,6 +44,7 @@ import java.io.InputStream; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.util.Collections; +import java.util.Map; public class SchemaRegistryBasedProtobufBytesDecoderTest { @@ -93,7 +98,7 @@ public class SchemaRegistryBasedProtobufBytesDecoderTest public void testDefaultCapacity() { // Given - SchemaRegistryBasedProtobufBytesDecoder schemaRegistryBasedProtobufBytesDecoder = new SchemaRegistryBasedProtobufBytesDecoder("http://test", null, null, null, null); + SchemaRegistryBasedProtobufBytesDecoder schemaRegistryBasedProtobufBytesDecoder = new SchemaRegistryBasedProtobufBytesDecoder("http://test", null, null, null, null, null); // When Assert.assertEquals(schemaRegistryBasedProtobufBytesDecoder.getIdentityMapCapacity(), Integer.MAX_VALUE); } @@ -103,7 +108,7 @@ public class SchemaRegistryBasedProtobufBytesDecoderTest { int capacity = 100; // Given - SchemaRegistryBasedProtobufBytesDecoder schemaRegistryBasedProtobufBytesDecoder = new SchemaRegistryBasedProtobufBytesDecoder("http://test", capacity, null, null, null); + SchemaRegistryBasedProtobufBytesDecoder schemaRegistryBasedProtobufBytesDecoder = new SchemaRegistryBasedProtobufBytesDecoder("http://test", capacity, null, null, null, null); // When Assert.assertEquals(schemaRegistryBasedProtobufBytesDecoder.getIdentityMapCapacity(), capacity); } @@ -120,7 +125,10 @@ public class SchemaRegistryBasedProtobufBytesDecoderTest public void testMultipleUrls() throws Exception { String json = "{\"urls\":[\"http://localhost\"],\"type\": \"schema_registry\"}"; - ObjectMapper mapper = new ObjectMapper(); + ObjectMapper mapper = new DefaultObjectMapper(); + mapper.setInjectableValues( + new InjectableValues.Std().addValue(ObjectMapper.class, new DefaultObjectMapper()) + ); SchemaRegistryBasedProtobufBytesDecoder decoder; decoder = (SchemaRegistryBasedProtobufBytesDecoder) mapper .readerFor(ProtobufBytesDecoder.class) @@ -134,7 +142,10 @@ public class SchemaRegistryBasedProtobufBytesDecoderTest public void testUrl() throws Exception { String json = "{\"url\":\"http://localhost\",\"type\": \"schema_registry\"}"; - ObjectMapper mapper = new ObjectMapper(); + ObjectMapper mapper = new DefaultObjectMapper(); + mapper.setInjectableValues( + new InjectableValues.Std().addValue(ObjectMapper.class, new DefaultObjectMapper()) + ); SchemaRegistryBasedProtobufBytesDecoder decoder; decoder = (SchemaRegistryBasedProtobufBytesDecoder) mapper .readerFor(ProtobufBytesDecoder.class) @@ -148,7 +159,10 @@ public class SchemaRegistryBasedProtobufBytesDecoderTest public void testConfig() throws Exception { String json = "{\"url\":\"http://localhost\",\"type\": \"schema_registry\", \"config\":{}}"; - ObjectMapper mapper = new ObjectMapper(); + ObjectMapper mapper = new DefaultObjectMapper(); + mapper.setInjectableValues( + new InjectableValues.Std().addValue(ObjectMapper.class, new DefaultObjectMapper()) + ); SchemaRegistryBasedProtobufBytesDecoder decoder; decoder = (SchemaRegistryBasedProtobufBytesDecoder) mapper .readerFor(ProtobufBytesDecoder.class) @@ -158,6 +172,50 @@ public class SchemaRegistryBasedProtobufBytesDecoderTest Assert.assertNotEquals(decoder.hashCode(), 0); } + @Test + public void testParseHeader() throws JsonProcessingException + { + String json = "{\"url\":\"http://localhost\",\"type\":\"schema_registry\",\"config\":{},\"headers\":{\"druid.dynamic.config.provider\":{\"type\":\"mapString\", \"config\":{\"registry.header.prop.2\":\"value.2\", \"registry.header.prop.3\":\"value.3\"}},\"registry.header.prop.1\":\"value.1\",\"registry.header.prop.2\":\"value.4\"}}"; + ObjectMapper mapper = new DefaultObjectMapper(); + mapper.setInjectableValues( + new InjectableValues.Std().addValue(ObjectMapper.class, new DefaultObjectMapper()) + ); + SchemaRegistryBasedProtobufBytesDecoder decoder; + decoder = (SchemaRegistryBasedProtobufBytesDecoder) mapper + .readerFor(ProtobufBytesDecoder.class) + .readValue(json); + + Map header = DynamicConfigProviderUtils.extraConfigAndSetStringMap(decoder.getHeaders(), SchemaRegistryBasedProtobufBytesDecoder.DRUID_DYNAMIC_CONFIG_PROVIDER_KEY, new DefaultObjectMapper()); + + // Then + Assert.assertEquals(3, header.size()); + Assert.assertEquals("value.1", header.get("registry.header.prop.1")); + Assert.assertEquals("value.2", header.get("registry.header.prop.2")); + Assert.assertEquals("value.3", header.get("registry.header.prop.3")); + } + + @Test + public void testParseConfig() throws JsonProcessingException + { + String json = "{\"url\":\"http://localhost\",\"type\":\"schema_registry\",\"config\":{\"druid.dynamic.config.provider\":{\"type\":\"mapString\", \"config\":{\"registry.config.prop.2\":\"value.2\", \"registry.config.prop.3\":\"value.3\"}},\"registry.config.prop.1\":\"value.1\",\"registry.config.prop.2\":\"value.4\"},\"headers\":{}}"; + ObjectMapper mapper = new DefaultObjectMapper(); + mapper.setInjectableValues( + new InjectableValues.Std().addValue(ObjectMapper.class, new DefaultObjectMapper()) + ); + SchemaRegistryBasedProtobufBytesDecoder decoder; + decoder = (SchemaRegistryBasedProtobufBytesDecoder) mapper + .readerFor(ProtobufBytesDecoder.class) + .readValue(json); + + Map heaeder = DynamicConfigProviderUtils.extraConfigAndSetObjectMap(decoder.getConfig(), SchemaRegistryBasedProtobufBytesDecoder.DRUID_DYNAMIC_CONFIG_PROVIDER_KEY, new DefaultObjectMapper()); + + // Then + Assert.assertEquals(3, heaeder.size()); + Assert.assertEquals("value.1", heaeder.get("registry.config.prop.1")); + Assert.assertEquals("value.2", heaeder.get("registry.config.prop.2")); + Assert.assertEquals("value.3", heaeder.get("registry.config.prop.3")); + } + private ProtobufSchema parseProtobufSchema() throws IOException { // Given