Add DynamicConfigProvider for Schema Registry (#11362)

* add_DynamicConfigProvider_for_schema_registry

* bug fixed

* add document

* fix document

* fix spot bug

* fix document

* inject ObjectMapper

* add DynamicConfigProviderUtils

* add UT

* bug fixed

Co-authored-by: yuanyi <yuanyi@freewheel.tv>
This commit is contained in:
Yi Yuan 2021-08-04 04:24:52 +08:00 committed by GitHub
parent 55a01a030a
commit aa7cb50f24
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 345 additions and 31 deletions

View File

@ -0,0 +1,73 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.utils;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.druid.metadata.DynamicConfigProvider;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
public class DynamicConfigProviderUtils
{
public static Map<String, String> extraConfigAndSetStringMap(Map<String, Object> config, String dynamicConfigProviderKey, ObjectMapper mapper)
{
HashMap<String, String> newConfig = new HashMap<>();
if (config != null) {
for (Map.Entry<String, Object> entry : config.entrySet()) {
if (!dynamicConfigProviderKey.equals(entry.getKey())) {
newConfig.put(entry.getKey(), entry.getValue().toString());
}
}
Map<String, String> dynamicConfig = extraConfigFromProvider(config.get(dynamicConfigProviderKey), mapper);
for (Map.Entry<String, String> entry : dynamicConfig.entrySet()) {
newConfig.put(entry.getKey(), entry.getValue());
}
}
return newConfig;
}
public static Map<String, Object> extraConfigAndSetObjectMap(Map<String, Object> config, String dynamicConfigProviderKey, ObjectMapper mapper)
{
HashMap<String, Object> newConfig = new HashMap<>();
if (config != null) {
for (Map.Entry<String, Object> entry : config.entrySet()) {
if (!dynamicConfigProviderKey.equals(entry.getKey())) {
newConfig.put(entry.getKey(), entry.getValue());
}
}
Map<String, String> dynamicConfig = extraConfigFromProvider(config.get(dynamicConfigProviderKey), mapper);
for (Map.Entry<String, String> entry : dynamicConfig.entrySet()) {
newConfig.put(entry.getKey(), entry.getValue());
}
}
return newConfig;
}
private static Map<String, String> extraConfigFromProvider(Object dynamicConfigProviderJson, ObjectMapper mapper)
{
if (dynamicConfigProviderJson != null) {
DynamicConfigProvider dynamicConfigProvider = mapper.convertValue(dynamicConfigProviderJson, DynamicConfigProvider.class);
return dynamicConfigProvider.getConfig();
}
return Collections.emptyMap();
}
}

View File

@ -0,0 +1,84 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.utils;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.metadata.DynamicConfigProvider;
import org.apache.druid.metadata.MapStringDynamicConfigProvider;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.runners.Enclosed;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import java.util.Map;
@RunWith(Enclosed.class)
public class DynamicConfigProviderUtilsTest
{
public static class ThrowIfURLHasNotAllowedPropertiesTest
{
@Rule
public ExpectedException expectedException = ExpectedException.none();
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private final String DYNAMIC_CONFIG_PROVIDER = "druid.dynamic.config.provider";
@Test
public void testExtraConfigAndSetStringMap()
{
DynamicConfigProvider dynamicConfigProvider = new MapStringDynamicConfigProvider(
ImmutableMap.of("prop2", "value2")
);
Map<String, Object> properties = ImmutableMap.of(
"prop1", "value1",
"prop2", "value3",
DYNAMIC_CONFIG_PROVIDER, OBJECT_MAPPER.convertValue(dynamicConfigProvider, Map.class)
);
Map<String, String> res = DynamicConfigProviderUtils.extraConfigAndSetStringMap(properties, DYNAMIC_CONFIG_PROVIDER, OBJECT_MAPPER);
Assert.assertEquals(2, res.size());
Assert.assertEquals("value1", res.get("prop1"));
Assert.assertEquals("value2", res.get("prop2"));
}
@Test
public void testExtraConfigAndSetObjectMap()
{
DynamicConfigProvider dynamicConfigProvider = new MapStringDynamicConfigProvider(
ImmutableMap.of("prop2", "value2")
);
Map<String, Object> properties = ImmutableMap.of(
"prop1", "value1",
"prop2", "value3",
DYNAMIC_CONFIG_PROVIDER, OBJECT_MAPPER.convertValue(dynamicConfigProvider, Map.class)
);
Map<String, Object> res = DynamicConfigProviderUtils.extraConfigAndSetObjectMap(properties, DYNAMIC_CONFIG_PROVIDER, OBJECT_MAPPER);
Assert.assertEquals(2, res.size());
Assert.assertEquals("value1", res.get("prop1").toString());
Assert.assertEquals("value2", res.get("prop2").toString());
}
}
}

View File

@ -380,8 +380,8 @@ For details, see the Schema Registry [documentation](http://docs.confluent.io/cu
| url | String | Specifies the url endpoint of the Schema Registry. | yes |
| capacity | Integer | Specifies the max size of the cache (default = Integer.MAX_VALUE). | no |
| urls | Array<String> | Specifies the url endpoints of the multiple Schema Registry instances. | yes(if `url` is not provided) |
| config | Json | To send additional configurations, configured for Schema Registry | no |
| headers | Json | To send headers to the Schema Registry | no |
| config | Json | To send additional configurations, configured for Schema Registry. This can be supplied via a [DynamicConfigProvider](../operations/dynamic-config-provider.md) | no |
| headers | Json | To send headers to the Schema Registry. This can be supplied via a [DynamicConfigProvider](../operations/dynamic-config-provider.md) | no |
For a single schema registry instance, use Field `url` or `urls` for multi instances.
@ -408,12 +408,20 @@ Multiple Instances:
"schema.registry.ssl.truststore.password": "<password>",
"schema.registry.ssl.keystore.location": "/some/secrets/kafka.client.keystore.jks",
"schema.registry.ssl.keystore.password": "<password>",
"schema.registry.ssl.key.password": "<password>"
"schema.registry.ssl.key.password": "<password>",
"schema.registry.ssl.key.password",
...
},
"headers": {
"traceID" : "b29c5de2-0db4-490b-b421",
"timeStamp" : "1577191871865",
"druid.dynamic.config.provider":{
"type":"mapString",
"config":{
"registry.header.prop.1":"value.1",
"registry.header.prop.2":"value.2"
}
}
...
}
}
@ -1223,8 +1231,8 @@ For details, see the Schema Registry [documentation](http://docs.confluent.io/cu
| url | String | Specifies the url endpoint of the Schema Registry. | yes |
| capacity | Integer | Specifies the max size of the cache (default = Integer.MAX_VALUE). | no |
| urls | Array<String> | Specifies the url endpoints of the multiple Schema Registry instances. | yes(if `url` is not provided) |
| config | Json | To send additional configurations, configured for Schema Registry | no |
| headers | Json | To send headers to the Schema Registry | no |
| config | Json | To send additional configurations, configured for Schema Registry. This can be supplied via a [DynamicConfigProvider](../operations/dynamic-config-provider.md). | no |
| headers | Json | To send headers to the Schema Registry. This can be supplied via a [DynamicConfigProvider](../operations/dynamic-config-provider.md) | no |
For a single schema registry instance, use Field `url` or `urls` for multi instances.
@ -1259,6 +1267,13 @@ Multiple Instances:
"headers": {
"traceID" : "b29c5de2-0db4-490b-b421",
"timeStamp" : "1577191871865",
"druid.dynamic.config.provider":{
"type":"mapString",
"config":{
"registry.header.prop.1":"value.1",
"registry.header.prop.2":"value.2"
}
}
...
}
}

View File

@ -19,8 +19,10 @@
package org.apache.druid.data.input.avro;
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import io.confluent.kafka.schemaregistry.ParsedSchema;
import io.confluent.kafka.schemaregistry.avro.AvroSchema;
@ -32,8 +34,10 @@ import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.DecoderFactory;
import org.apache.druid.guice.annotations.Json;
import org.apache.druid.java.util.common.RE;
import org.apache.druid.java.util.common.parsers.ParseException;
import org.apache.druid.utils.DynamicConfigProviderUtils;
import javax.annotation.Nullable;
import java.io.IOException;
@ -48,16 +52,19 @@ public class SchemaRegistryBasedAvroBytesDecoder implements AvroBytesDecoder
private final String url;
private final int capacity;
private final List<String> urls;
private final Map<String, ?> config;
private final Map<String, String> headers;
private final Map<String, Object> config;
private final Map<String, Object> headers;
private final ObjectMapper jsonMapper;
public static final String DRUID_DYNAMIC_CONFIG_PROVIDER_KEY = "druid.dynamic.config.provider";
@JsonCreator
public SchemaRegistryBasedAvroBytesDecoder(
@JsonProperty("url") @Deprecated String url,
@JsonProperty("capacity") Integer capacity,
@JsonProperty("urls") @Nullable List<String> urls,
@JsonProperty("config") @Nullable Map<String, ?> config,
@JsonProperty("headers") @Nullable Map<String, String> headers
@JsonProperty("config") @Nullable Map<String, Object> config,
@JsonProperty("headers") @Nullable Map<String, Object> headers,
@JacksonInject @Json ObjectMapper jsonMapper
)
{
this.url = url;
@ -65,10 +72,11 @@ public class SchemaRegistryBasedAvroBytesDecoder implements AvroBytesDecoder
this.urls = urls;
this.config = config;
this.headers = headers;
this.jsonMapper = jsonMapper;
if (url != null && !url.isEmpty()) {
this.registry = new CachedSchemaRegistryClient(this.url, this.capacity, this.config, this.headers);
this.registry = new CachedSchemaRegistryClient(this.url, this.capacity, DynamicConfigProviderUtils.extraConfigAndSetObjectMap(config, DRUID_DYNAMIC_CONFIG_PROVIDER_KEY, this.jsonMapper), DynamicConfigProviderUtils.extraConfigAndSetStringMap(headers, DRUID_DYNAMIC_CONFIG_PROVIDER_KEY, this.jsonMapper));
} else {
this.registry = new CachedSchemaRegistryClient(this.urls, this.capacity, this.config, this.headers);
this.registry = new CachedSchemaRegistryClient(this.urls, this.capacity, DynamicConfigProviderUtils.extraConfigAndSetObjectMap(config, DRUID_DYNAMIC_CONFIG_PROVIDER_KEY, this.jsonMapper), DynamicConfigProviderUtils.extraConfigAndSetStringMap(headers, DRUID_DYNAMIC_CONFIG_PROVIDER_KEY, this.jsonMapper));
}
}
@ -91,13 +99,13 @@ public class SchemaRegistryBasedAvroBytesDecoder implements AvroBytesDecoder
}
@JsonProperty
public Map<String, ?> getConfig()
public Map<String, Object> getConfig()
{
return config;
}
@JsonProperty
public Map<String, String> getHeaders()
public Map<String, Object> getHeaders()
{
return headers;
}
@ -112,6 +120,7 @@ public class SchemaRegistryBasedAvroBytesDecoder implements AvroBytesDecoder
this.config = null;
this.headers = null;
this.registry = registry;
this.jsonMapper = new ObjectMapper();
}
@Override

View File

@ -19,6 +19,7 @@
package org.apache.druid.data.input;
import com.fasterxml.jackson.databind.InjectableValues;
import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
@ -108,6 +109,9 @@ public class AvroStreamInputFormatTest
for (Module jacksonModule : new AvroExtensionsModule().getJacksonModules()) {
jsonMapper.registerModule(jacksonModule);
}
jsonMapper.setInjectableValues(
new InjectableValues.Std().addValue(ObjectMapper.class, new DefaultObjectMapper())
);
}
@Test
@ -151,7 +155,7 @@ public class AvroStreamInputFormatTest
{
AvroStreamInputFormat inputFormat = new AvroStreamInputFormat(
flattenSpec,
new SchemaRegistryBasedAvroBytesDecoder("http://test:8081", 100, null, null, null),
new SchemaRegistryBasedAvroBytesDecoder("http://test:8081", 100, null, null, null, null),
false,
false
);

View File

@ -19,6 +19,8 @@
package org.apache.druid.data.input.avro;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.InjectableValues;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.confluent.kafka.schemaregistry.ParsedSchema;
import io.confluent.kafka.schemaregistry.avro.AvroSchema;
@ -30,8 +32,10 @@ import org.apache.avro.io.EncoderFactory;
import org.apache.avro.specific.SpecificDatumWriter;
import org.apache.druid.data.input.AvroStreamInputRowParserTest;
import org.apache.druid.data.input.SomeAvroDatum;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.RE;
import org.apache.druid.java.util.common.parsers.ParseException;
import org.apache.druid.utils.DynamicConfigProviderUtils;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@ -41,6 +45,7 @@ import org.mockito.Mockito;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Map;
public class SchemaRegistryBasedAvroBytesDecoderTest
{
@ -56,7 +61,10 @@ public class SchemaRegistryBasedAvroBytesDecoderTest
public void testMultipleUrls() throws Exception
{
String json = "{\"urls\":[\"http://localhost\"],\"type\": \"schema_registry\"}";
ObjectMapper mapper = new ObjectMapper();
ObjectMapper mapper = new DefaultObjectMapper();
mapper.setInjectableValues(
new InjectableValues.Std().addValue(ObjectMapper.class, new DefaultObjectMapper())
);
SchemaRegistryBasedAvroBytesDecoder decoder;
decoder = (SchemaRegistryBasedAvroBytesDecoder) mapper
.readerFor(AvroBytesDecoder.class)
@ -70,7 +78,10 @@ public class SchemaRegistryBasedAvroBytesDecoderTest
public void testUrl() throws Exception
{
String json = "{\"url\":\"http://localhost\",\"type\": \"schema_registry\"}";
ObjectMapper mapper = new ObjectMapper();
ObjectMapper mapper = new DefaultObjectMapper();
mapper.setInjectableValues(
new InjectableValues.Std().addValue(ObjectMapper.class, new DefaultObjectMapper())
);
SchemaRegistryBasedAvroBytesDecoder decoder;
decoder = (SchemaRegistryBasedAvroBytesDecoder) mapper
.readerFor(AvroBytesDecoder.class)
@ -84,7 +95,10 @@ public class SchemaRegistryBasedAvroBytesDecoderTest
public void testConfig() throws Exception
{
String json = "{\"url\":\"http://localhost\",\"type\": \"schema_registry\", \"config\":{}}";
ObjectMapper mapper = new ObjectMapper();
ObjectMapper mapper = new DefaultObjectMapper();
mapper.setInjectableValues(
new InjectableValues.Std().addValue(ObjectMapper.class, new DefaultObjectMapper())
);
SchemaRegistryBasedAvroBytesDecoder decoder;
decoder = (SchemaRegistryBasedAvroBytesDecoder) mapper
.readerFor(AvroBytesDecoder.class)
@ -163,4 +177,48 @@ public class SchemaRegistryBasedAvroBytesDecoderTest
writer.write(someAvroDatum, EncoderFactory.get().directBinaryEncoder(out, null));
return out.toByteArray();
}
@Test
public void testParseHeader() throws JsonProcessingException
{
String json = "{\"url\":\"http://localhost\",\"type\":\"schema_registry\",\"config\":{},\"headers\":{\"druid.dynamic.config.provider\":{\"type\":\"mapString\", \"config\":{\"registry.header.prop.2\":\"value.2\", \"registry.header.prop.3\":\"value.3\"}},\"registry.header.prop.1\":\"value.1\",\"registry.header.prop.2\":\"value.4\"}}";
ObjectMapper mapper = new DefaultObjectMapper();
mapper.setInjectableValues(
new InjectableValues.Std().addValue(ObjectMapper.class, new DefaultObjectMapper())
);
SchemaRegistryBasedAvroBytesDecoder decoder;
decoder = (SchemaRegistryBasedAvroBytesDecoder) mapper
.readerFor(AvroBytesDecoder.class)
.readValue(json);
Map<String, String> header = DynamicConfigProviderUtils.extraConfigAndSetStringMap(decoder.getHeaders(), SchemaRegistryBasedAvroBytesDecoder.DRUID_DYNAMIC_CONFIG_PROVIDER_KEY, new DefaultObjectMapper());
// Then
Assert.assertEquals(3, header.size());
Assert.assertEquals("value.1", header.get("registry.header.prop.1"));
Assert.assertEquals("value.2", header.get("registry.header.prop.2"));
Assert.assertEquals("value.3", header.get("registry.header.prop.3"));
}
@Test
public void testParseConfig() throws JsonProcessingException
{
String json = "{\"url\":\"http://localhost\",\"type\":\"schema_registry\",\"config\":{\"druid.dynamic.config.provider\":{\"type\":\"mapString\", \"config\":{\"registry.config.prop.2\":\"value.2\", \"registry.config.prop.3\":\"value.3\"}},\"registry.config.prop.1\":\"value.1\",\"registry.config.prop.2\":\"value.4\"},\"headers\":{}}";
ObjectMapper mapper = new DefaultObjectMapper();
mapper.setInjectableValues(
new InjectableValues.Std().addValue(ObjectMapper.class, new DefaultObjectMapper())
);
SchemaRegistryBasedAvroBytesDecoder decoder;
decoder = (SchemaRegistryBasedAvroBytesDecoder) mapper
.readerFor(AvroBytesDecoder.class)
.readValue(json);
Map<String, ?> config = DynamicConfigProviderUtils.extraConfigAndSetStringMap(decoder.getConfig(), SchemaRegistryBasedAvroBytesDecoder.DRUID_DYNAMIC_CONFIG_PROVIDER_KEY, new DefaultObjectMapper());
// Then
Assert.assertEquals(3, config.size());
Assert.assertEquals("value.1", config.get("registry.config.prop.1"));
Assert.assertEquals("value.2", config.get("registry.config.prop.2"));
Assert.assertEquals("value.3", config.get("registry.config.prop.3"));
}
}

View File

@ -19,8 +19,10 @@
package org.apache.druid.data.input.protobuf;
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.Descriptors;
import com.google.protobuf.DynamicMessage;
@ -29,8 +31,10 @@ import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema;
import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchemaProvider;
import org.apache.druid.guice.annotations.Json;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.common.parsers.ParseException;
import org.apache.druid.utils.DynamicConfigProviderUtils;
import javax.annotation.Nullable;
@ -50,16 +54,19 @@ public class SchemaRegistryBasedProtobufBytesDecoder implements ProtobufBytesDec
private final String url;
private final int capacity;
private final List<String> urls;
private final Map<String, ?> config;
private final Map<String, String> headers;
private final Map<String, Object> config;
private final Map<String, Object> headers;
private final ObjectMapper jsonMapper;
public static final String DRUID_DYNAMIC_CONFIG_PROVIDER_KEY = "druid.dynamic.config.provider";
@JsonCreator
public SchemaRegistryBasedProtobufBytesDecoder(
@JsonProperty("url") @Deprecated String url,
@JsonProperty("capacity") Integer capacity,
@JsonProperty("urls") @Nullable List<String> urls,
@JsonProperty("config") @Nullable Map<String, ?> config,
@JsonProperty("headers") @Nullable Map<String, String> headers
@JsonProperty("config") @Nullable Map<String, Object> config,
@JsonProperty("headers") @Nullable Map<String, Object> headers,
@JacksonInject @Json ObjectMapper jsonMapper
)
{
this.url = url;
@ -67,10 +74,11 @@ public class SchemaRegistryBasedProtobufBytesDecoder implements ProtobufBytesDec
this.urls = urls;
this.config = config;
this.headers = headers;
this.jsonMapper = jsonMapper;
if (url != null && !url.isEmpty()) {
this.registry = new CachedSchemaRegistryClient(Collections.singletonList(this.url), this.capacity, Collections.singletonList(new ProtobufSchemaProvider()), this.config, this.headers);
this.registry = new CachedSchemaRegistryClient(Collections.singletonList(this.url), this.capacity, Collections.singletonList(new ProtobufSchemaProvider()), DynamicConfigProviderUtils.extraConfigAndSetObjectMap(config, DRUID_DYNAMIC_CONFIG_PROVIDER_KEY, this.jsonMapper), DynamicConfigProviderUtils.extraConfigAndSetStringMap(headers, DRUID_DYNAMIC_CONFIG_PROVIDER_KEY, this.jsonMapper));
} else {
this.registry = new CachedSchemaRegistryClient(this.urls, this.capacity, Collections.singletonList(new ProtobufSchemaProvider()), this.config, this.headers);
this.registry = new CachedSchemaRegistryClient(this.urls, this.capacity, Collections.singletonList(new ProtobufSchemaProvider()), DynamicConfigProviderUtils.extraConfigAndSetObjectMap(config, DRUID_DYNAMIC_CONFIG_PROVIDER_KEY, this.jsonMapper), DynamicConfigProviderUtils.extraConfigAndSetStringMap(headers, DRUID_DYNAMIC_CONFIG_PROVIDER_KEY, this.jsonMapper));
}
}
@ -93,13 +101,13 @@ public class SchemaRegistryBasedProtobufBytesDecoder implements ProtobufBytesDec
}
@JsonProperty
public Map<String, ?> getConfig()
public Map<String, Object> getConfig()
{
return config;
}
@JsonProperty
public Map<String, String> getHeaders()
public Map<String, Object> getHeaders()
{
return headers;
}
@ -119,6 +127,7 @@ public class SchemaRegistryBasedProtobufBytesDecoder implements ProtobufBytesDec
this.config = null;
this.headers = null;
this.registry = registry;
this.jsonMapper = new ObjectMapper();
}
@Override

View File

@ -19,6 +19,7 @@
package org.apache.druid.data.input.protobuf;
import com.fasterxml.jackson.databind.InjectableValues;
import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Lists;
@ -77,6 +78,9 @@ public class ProtobufInputFormatTest
for (Module jacksonModule : new ProtobufExtensionsModule().getJacksonModules()) {
jsonMapper.registerModule(jacksonModule);
}
jsonMapper.setInjectableValues(
new InjectableValues.Std().addValue(ObjectMapper.class, new DefaultObjectMapper())
);
}
@Test
@ -99,7 +103,7 @@ public class ProtobufInputFormatTest
{
ProtobufInputFormat inputFormat = new ProtobufInputFormat(
flattenSpec,
new SchemaRegistryBasedProtobufBytesDecoder("http://test:8081", 100, null, null, null)
new SchemaRegistryBasedProtobufBytesDecoder("http://test:8081", 100, null, null, null, null)
);
NestedInputFormat inputFormat2 = jsonMapper.readValue(
jsonMapper.writeValueAsString(inputFormat),

View File

@ -19,6 +19,8 @@
package org.apache.druid.data.input.protobuf;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.InjectableValues;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableMap;
import com.google.protobuf.DynamicMessage;
@ -26,7 +28,9 @@ import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema;
import org.apache.commons.io.IOUtils;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.parsers.ParseException;
import org.apache.druid.utils.DynamicConfigProviderUtils;
import org.joda.time.DateTime;
import org.joda.time.chrono.ISOChronology;
import org.junit.Assert;
@ -40,6 +44,7 @@ import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.Map;
public class SchemaRegistryBasedProtobufBytesDecoderTest
{
@ -93,7 +98,7 @@ public class SchemaRegistryBasedProtobufBytesDecoderTest
public void testDefaultCapacity()
{
// Given
SchemaRegistryBasedProtobufBytesDecoder schemaRegistryBasedProtobufBytesDecoder = new SchemaRegistryBasedProtobufBytesDecoder("http://test", null, null, null, null);
SchemaRegistryBasedProtobufBytesDecoder schemaRegistryBasedProtobufBytesDecoder = new SchemaRegistryBasedProtobufBytesDecoder("http://test", null, null, null, null, null);
// When
Assert.assertEquals(schemaRegistryBasedProtobufBytesDecoder.getIdentityMapCapacity(), Integer.MAX_VALUE);
}
@ -103,7 +108,7 @@ public class SchemaRegistryBasedProtobufBytesDecoderTest
{
int capacity = 100;
// Given
SchemaRegistryBasedProtobufBytesDecoder schemaRegistryBasedProtobufBytesDecoder = new SchemaRegistryBasedProtobufBytesDecoder("http://test", capacity, null, null, null);
SchemaRegistryBasedProtobufBytesDecoder schemaRegistryBasedProtobufBytesDecoder = new SchemaRegistryBasedProtobufBytesDecoder("http://test", capacity, null, null, null, null);
// When
Assert.assertEquals(schemaRegistryBasedProtobufBytesDecoder.getIdentityMapCapacity(), capacity);
}
@ -120,7 +125,10 @@ public class SchemaRegistryBasedProtobufBytesDecoderTest
public void testMultipleUrls() throws Exception
{
String json = "{\"urls\":[\"http://localhost\"],\"type\": \"schema_registry\"}";
ObjectMapper mapper = new ObjectMapper();
ObjectMapper mapper = new DefaultObjectMapper();
mapper.setInjectableValues(
new InjectableValues.Std().addValue(ObjectMapper.class, new DefaultObjectMapper())
);
SchemaRegistryBasedProtobufBytesDecoder decoder;
decoder = (SchemaRegistryBasedProtobufBytesDecoder) mapper
.readerFor(ProtobufBytesDecoder.class)
@ -134,7 +142,10 @@ public class SchemaRegistryBasedProtobufBytesDecoderTest
public void testUrl() throws Exception
{
String json = "{\"url\":\"http://localhost\",\"type\": \"schema_registry\"}";
ObjectMapper mapper = new ObjectMapper();
ObjectMapper mapper = new DefaultObjectMapper();
mapper.setInjectableValues(
new InjectableValues.Std().addValue(ObjectMapper.class, new DefaultObjectMapper())
);
SchemaRegistryBasedProtobufBytesDecoder decoder;
decoder = (SchemaRegistryBasedProtobufBytesDecoder) mapper
.readerFor(ProtobufBytesDecoder.class)
@ -148,7 +159,10 @@ public class SchemaRegistryBasedProtobufBytesDecoderTest
public void testConfig() throws Exception
{
String json = "{\"url\":\"http://localhost\",\"type\": \"schema_registry\", \"config\":{}}";
ObjectMapper mapper = new ObjectMapper();
ObjectMapper mapper = new DefaultObjectMapper();
mapper.setInjectableValues(
new InjectableValues.Std().addValue(ObjectMapper.class, new DefaultObjectMapper())
);
SchemaRegistryBasedProtobufBytesDecoder decoder;
decoder = (SchemaRegistryBasedProtobufBytesDecoder) mapper
.readerFor(ProtobufBytesDecoder.class)
@ -158,6 +172,50 @@ public class SchemaRegistryBasedProtobufBytesDecoderTest
Assert.assertNotEquals(decoder.hashCode(), 0);
}
@Test
public void testParseHeader() throws JsonProcessingException
{
String json = "{\"url\":\"http://localhost\",\"type\":\"schema_registry\",\"config\":{},\"headers\":{\"druid.dynamic.config.provider\":{\"type\":\"mapString\", \"config\":{\"registry.header.prop.2\":\"value.2\", \"registry.header.prop.3\":\"value.3\"}},\"registry.header.prop.1\":\"value.1\",\"registry.header.prop.2\":\"value.4\"}}";
ObjectMapper mapper = new DefaultObjectMapper();
mapper.setInjectableValues(
new InjectableValues.Std().addValue(ObjectMapper.class, new DefaultObjectMapper())
);
SchemaRegistryBasedProtobufBytesDecoder decoder;
decoder = (SchemaRegistryBasedProtobufBytesDecoder) mapper
.readerFor(ProtobufBytesDecoder.class)
.readValue(json);
Map<String, String> header = DynamicConfigProviderUtils.extraConfigAndSetStringMap(decoder.getHeaders(), SchemaRegistryBasedProtobufBytesDecoder.DRUID_DYNAMIC_CONFIG_PROVIDER_KEY, new DefaultObjectMapper());
// Then
Assert.assertEquals(3, header.size());
Assert.assertEquals("value.1", header.get("registry.header.prop.1"));
Assert.assertEquals("value.2", header.get("registry.header.prop.2"));
Assert.assertEquals("value.3", header.get("registry.header.prop.3"));
}
@Test
public void testParseConfig() throws JsonProcessingException
{
String json = "{\"url\":\"http://localhost\",\"type\":\"schema_registry\",\"config\":{\"druid.dynamic.config.provider\":{\"type\":\"mapString\", \"config\":{\"registry.config.prop.2\":\"value.2\", \"registry.config.prop.3\":\"value.3\"}},\"registry.config.prop.1\":\"value.1\",\"registry.config.prop.2\":\"value.4\"},\"headers\":{}}";
ObjectMapper mapper = new DefaultObjectMapper();
mapper.setInjectableValues(
new InjectableValues.Std().addValue(ObjectMapper.class, new DefaultObjectMapper())
);
SchemaRegistryBasedProtobufBytesDecoder decoder;
decoder = (SchemaRegistryBasedProtobufBytesDecoder) mapper
.readerFor(ProtobufBytesDecoder.class)
.readValue(json);
Map<String, ?> heaeder = DynamicConfigProviderUtils.extraConfigAndSetObjectMap(decoder.getConfig(), SchemaRegistryBasedProtobufBytesDecoder.DRUID_DYNAMIC_CONFIG_PROVIDER_KEY, new DefaultObjectMapper());
// Then
Assert.assertEquals(3, heaeder.size());
Assert.assertEquals("value.1", heaeder.get("registry.config.prop.1"));
Assert.assertEquals("value.2", heaeder.get("registry.config.prop.2"));
Assert.assertEquals("value.3", heaeder.get("registry.config.prop.3"));
}
private ProtobufSchema parseProtobufSchema() throws IOException
{
// Given