diff --git a/core/src/main/java/org/apache/druid/metadata/DynamicConfigProvider.java b/core/src/main/java/org/apache/druid/metadata/DynamicConfigProvider.java new file mode 100644 index 00000000000..52f032a0a19 --- /dev/null +++ b/core/src/main/java/org/apache/druid/metadata/DynamicConfigProvider.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.metadata; + +import com.fasterxml.jackson.annotation.JsonSubTypes; +import com.fasterxml.jackson.annotation.JsonTypeInfo; +import org.apache.druid.guice.annotations.ExtensionPoint; + +import java.util.Map; + +/** + * This is used to get [secure] configuration in various places in an extensible way. + */ +@ExtensionPoint +@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = MapStringDynamicConfigProvider.class) +@JsonSubTypes(value = { + @JsonSubTypes.Type(name = "mapString", value = MapStringDynamicConfigProvider.class), +}) +public interface DynamicConfigProvider +{ + Map getConfig(); +} diff --git a/core/src/main/java/org/apache/druid/metadata/MapStringDynamicConfigProvider.java b/core/src/main/java/org/apache/druid/metadata/MapStringDynamicConfigProvider.java new file mode 100644 index 00000000000..1ef5a15e8a9 --- /dev/null +++ b/core/src/main/java/org/apache/druid/metadata/MapStringDynamicConfigProvider.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.metadata; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.collect.ImmutableMap; + +import java.util.Map; + +public class MapStringDynamicConfigProvider implements DynamicConfigProvider +{ + private final ImmutableMap config; + + @JsonCreator + public MapStringDynamicConfigProvider( + @JsonProperty("config") Map config + ) + { + this.config = ImmutableMap.copyOf(config); + } + + + @Override + @JsonProperty + public Map getConfig() + { + return config; + } +} diff --git a/core/src/main/java/org/apache/druid/metadata/PasswordProvider.java b/core/src/main/java/org/apache/druid/metadata/PasswordProvider.java index cd3a01c429f..ab7ec7c2287 100644 --- a/core/src/main/java/org/apache/druid/metadata/PasswordProvider.java +++ b/core/src/main/java/org/apache/druid/metadata/PasswordProvider.java @@ -26,7 +26,13 @@ import org.apache.druid.guice.annotations.ExtensionPoint; /** * Implement this for different ways to (optionally securely) access secrets. + * + * Any further use case of extensible configuration/secrets must use {@link DynamicConfigProvider} interface. Users + * may still implement this interface for existing use cases till https://github.com/apache/druid/issues/9351 is + * resolved. + * */ +@Deprecated @ExtensionPoint @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = DefaultPasswordProvider.class) @JsonSubTypes(value = { diff --git a/core/src/test/java/org/apache/druid/metadata/MapStringDynamicConfigProviderTest.java b/core/src/test/java/org/apache/druid/metadata/MapStringDynamicConfigProviderTest.java new file mode 100644 index 00000000000..cdf46e12b36 --- /dev/null +++ b/core/src/test/java/org/apache/druid/metadata/MapStringDynamicConfigProviderTest.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.metadata; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableMap; +import org.junit.Assert; +import org.junit.Test; + +public class MapStringDynamicConfigProviderTest +{ + @Test + public void testSerde() throws Exception + { + DynamicConfigProvider original = new MapStringDynamicConfigProvider(ImmutableMap.of("k", "v")); + + ObjectMapper jsonMapper = new ObjectMapper(); + + MapStringDynamicConfigProvider recreated = (MapStringDynamicConfigProvider) jsonMapper.readValue( + jsonMapper.writeValueAsString(original), + DynamicConfigProvider.class + ); + + Assert.assertEquals(1, recreated.getConfig().size()); + Assert.assertEquals("v", recreated.getConfig().get("k")); + } +} diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java index 3ffe7fa83ec..ad870f3ef00 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java @@ -28,6 +28,7 @@ import org.apache.druid.indexing.seekablestream.common.RecordSupplier; import org.apache.druid.indexing.seekablestream.common.StreamException; import org.apache.druid.indexing.seekablestream.common.StreamPartition; import org.apache.druid.java.util.common.ISE; +import org.apache.druid.metadata.DynamicConfigProvider; import org.apache.druid.metadata.PasswordProvider; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.KafkaConsumer; @@ -192,16 +193,30 @@ public class KafkaRecordSupplier implements RecordSupplier // Extract passwords before SSL connection to Kafka for (Map.Entry entry : consumerProperties.entrySet()) { String propertyKey = entry.getKey(); - if (propertyKey.equals(KafkaSupervisorIOConfig.TRUST_STORE_PASSWORD_KEY) - || propertyKey.equals(KafkaSupervisorIOConfig.KEY_STORE_PASSWORD_KEY) - || propertyKey.equals(KafkaSupervisorIOConfig.KEY_PASSWORD_KEY)) { - PasswordProvider configPasswordProvider = configMapper.convertValue( - entry.getValue(), - PasswordProvider.class - ); - properties.setProperty(propertyKey, configPasswordProvider.getPassword()); - } else { - properties.setProperty(propertyKey, String.valueOf(entry.getValue())); + + if (!KafkaSupervisorIOConfig.DRUID_DYNAMIC_CONFIG_PROVIDER_KEY.equals(propertyKey)) { + if (propertyKey.equals(KafkaSupervisorIOConfig.TRUST_STORE_PASSWORD_KEY) + || propertyKey.equals(KafkaSupervisorIOConfig.KEY_STORE_PASSWORD_KEY) + || propertyKey.equals(KafkaSupervisorIOConfig.KEY_PASSWORD_KEY)) { + PasswordProvider configPasswordProvider = configMapper.convertValue( + entry.getValue(), + PasswordProvider.class + ); + properties.setProperty(propertyKey, configPasswordProvider.getPassword()); + } else { + properties.setProperty(propertyKey, String.valueOf(entry.getValue())); + } + } + } + + // Additional DynamicConfigProvider based extensible support for all consumer properties + Object dynamicConfigProviderJson = consumerProperties.get(KafkaSupervisorIOConfig.DRUID_DYNAMIC_CONFIG_PROVIDER_KEY); + if (dynamicConfigProviderJson != null) { + DynamicConfigProvider dynamicConfigProvider = configMapper.convertValue(dynamicConfigProviderJson, DynamicConfigProvider.class); + Map dynamicConfig = dynamicConfigProvider.getConfig(); + + for (Map.Entry e : dynamicConfig.entrySet()) { + properties.setProperty(e.getKey(), e.getValue()); } } } diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfig.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfig.java index a1360b5c63f..62c1e790657 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfig.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfig.java @@ -32,6 +32,7 @@ import java.util.Map; public class KafkaSupervisorIOConfig extends SeekableStreamSupervisorIOConfig { + public static final String DRUID_DYNAMIC_CONFIG_PROVIDER_KEY = "druid.dynamic.config.provider"; public static final String BOOTSTRAP_SERVERS_KEY = "bootstrap.servers"; public static final String TRUST_STORE_PASSWORD_KEY = "ssl.truststore.password"; public static final String KEY_STORE_PASSWORD_KEY = "ssl.keystore.password"; diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaRecordSupplierTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaRecordSupplierTest.java index d1580335232..ca152210f57 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaRecordSupplierTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaRecordSupplierTest.java @@ -24,10 +24,13 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import org.apache.curator.test.TestingCluster; +import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorIOConfig; import org.apache.druid.indexing.kafka.test.TestBroker; import org.apache.druid.indexing.seekablestream.common.OrderedPartitionableRecord; import org.apache.druid.indexing.seekablestream.common.StreamPartition; import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.metadata.DynamicConfigProvider; +import org.apache.druid.metadata.MapStringDynamicConfigProvider; import org.apache.druid.segment.TestHelper; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; @@ -43,6 +46,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Properties; import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.stream.Collectors; @@ -582,6 +586,33 @@ public class KafkaRecordSupplierTest Assert.assertEquals(new Long(0), recordSupplier.getEarliestSequenceNumber(streamPartition)); } + @Test + public void testAddConsumerPropertiesFromConfig() + { + DynamicConfigProvider dynamicConfigProvider = new MapStringDynamicConfigProvider( + ImmutableMap.of("kafka.prop.2", "value.2", KafkaSupervisorIOConfig.TRUST_STORE_PASSWORD_KEY, "pwd2") + ); + + Properties properties = new Properties(); + + Map consumerProperties = ImmutableMap.of( + KafkaSupervisorIOConfig.TRUST_STORE_PASSWORD_KEY, "pwd1", + "kafka.prop.1", "value.1", + "druid.dynamic.config.provider", OBJECT_MAPPER.convertValue(dynamicConfigProvider, Map.class) + ); + + KafkaRecordSupplier.addConsumerPropertiesFromConfig( + properties, + OBJECT_MAPPER, + consumerProperties + ); + + Assert.assertEquals(3, properties.size()); + Assert.assertEquals("value.1", properties.getProperty("kafka.prop.1")); + Assert.assertEquals("value.2", properties.getProperty("kafka.prop.2")); + Assert.assertEquals("pwd2", properties.getProperty(KafkaSupervisorIOConfig.TRUST_STORE_PASSWORD_KEY)); + } + private void insertData() throws ExecutionException, InterruptedException { try (final KafkaProducer kafkaProducer = kafkaServer.newProducer()) {