From ab7b4798cc2747bb68550b67f69660965a276dc8 Mon Sep 17 00:00:00 2001 From: Atul Mohan Date: Thu, 11 Oct 2018 12:03:01 -0500 Subject: [PATCH] Securing passwords used for SSL connections to Kafka (#6285) * Secure credentials in consumer properties * Merge master * Refactor property population into separate method * Fix property setter * Fix tests --- .../extensions-core/kafka-ingestion.md | 2 +- .../druid/indexing/kafka/KafkaIOConfig.java | 6 ++-- .../druid/indexing/kafka/KafkaIndexTask.java | 32 ++++++++++++++++--- .../kafka/supervisor/KafkaSupervisor.java | 7 ++-- .../supervisor/KafkaSupervisorIOConfig.java | 9 ++++-- .../indexing/kafka/KafkaIndexTaskTest.java | 12 ++++--- .../KafkaSupervisorIOConfigTest.java | 28 ++++++++++++++++ .../kafka/supervisor/KafkaSupervisorTest.java | 5 +-- .../druid/indexing/kafka/test/TestBroker.java | 4 +-- 9 files changed, 81 insertions(+), 24 deletions(-) diff --git a/docs/content/development/extensions-core/kafka-ingestion.md b/docs/content/development/extensions-core/kafka-ingestion.md index 568fc94fe30..12bd5f637de 100644 --- a/docs/content/development/extensions-core/kafka-ingestion.md +++ b/docs/content/development/extensions-core/kafka-ingestion.md @@ -166,7 +166,7 @@ For Roaring bitmaps: |Field|Type|Description|Required| |-----|----|-----------|--------| |`topic`|String|The Kafka topic to read from. This must be a specific topic as topic patterns are not supported.|yes| -|`consumerProperties`|Map|A map of properties to be passed to the Kafka consumer. This must contain a property `bootstrap.servers` with a list of Kafka brokers in the form: `:,:,...`.|yes| +|`consumerProperties`|Map|A map of properties to be passed to the Kafka consumer. This must contain a property `bootstrap.servers` with a list of Kafka brokers in the form: `:,:,...`. For SSL connections, the `keystore`, `truststore` and `key` passwords can be provided as a [Password Provider](../../operations/password-provider.html) or String password.|yes| |`replicas`|Integer|The number of replica sets, where 1 means a single set of tasks (no replication). Replica tasks will always be assigned to different workers to provide resiliency against node failure.|no (default == 1)| |`taskCount`|Integer|The maximum number of *reading* tasks in a *replica set*. This means that the maximum number of reading tasks will be `taskCount * replicas` and the total number of tasks (*reading* + *publishing*) will be higher than this. See 'Capacity Planning' below for more details. The number of reading tasks will be less than `taskCount` if `taskCount > {numKafkaPartitions}`.|no (default == 1)| |`taskDuration`|ISO8601 Period|The length of time before tasks stop reading and begin publishing their segment.|no (default == PT1H)| diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIOConfig.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIOConfig.java index 3c60449e410..6a9af7fcea9 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIOConfig.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIOConfig.java @@ -39,7 +39,7 @@ public class KafkaIOConfig implements IOConfig private final String baseSequenceName; private final KafkaPartitions startPartitions; private final KafkaPartitions endPartitions; - private final Map consumerProperties; + private final Map consumerProperties; private final boolean useTransaction; private final Optional minimumMessageTime; private final Optional maximumMessageTime; @@ -51,7 +51,7 @@ public class KafkaIOConfig implements IOConfig @JsonProperty("baseSequenceName") String baseSequenceName, @JsonProperty("startPartitions") KafkaPartitions startPartitions, @JsonProperty("endPartitions") KafkaPartitions endPartitions, - @JsonProperty("consumerProperties") Map consumerProperties, + @JsonProperty("consumerProperties") Map consumerProperties, @JsonProperty("useTransaction") Boolean useTransaction, @JsonProperty("minimumMessageTime") DateTime minimumMessageTime, @JsonProperty("maximumMessageTime") DateTime maximumMessageTime, @@ -114,7 +114,7 @@ public class KafkaIOConfig implements IOConfig } @JsonProperty - public Map getConsumerProperties() + public Map getConsumerProperties() { return consumerProperties; } diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java index 0e362e2e7e9..bb73651e6e8 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java @@ -22,6 +22,7 @@ package org.apache.druid.indexing.kafka; import com.fasterxml.jackson.annotation.JacksonInject; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Joiner; import com.google.common.base.Optional; @@ -41,10 +42,12 @@ import org.apache.druid.indexing.common.task.RealtimeIndexTask; import org.apache.druid.indexing.common.task.TaskResource; import org.apache.druid.indexing.common.task.Tasks; import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisor; +import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorIOConfig; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.parsers.ParseException; import org.apache.druid.java.util.emitter.EmittingLogger; +import org.apache.druid.metadata.PasswordProvider; import org.apache.druid.query.NoopQueryRunner; import org.apache.druid.query.Query; import org.apache.druid.query.QueryRunner; @@ -92,6 +95,7 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler private final KafkaIOConfig ioConfig; private final Optional chatHandlerProvider; private final KafkaIndexTaskRunner runner; + private final ObjectMapper configMapper; // This value can be tuned in some tests private long pollRetryMs = 30000; @@ -106,7 +110,8 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler @JsonProperty("context") Map context, @JacksonInject ChatHandlerProvider chatHandlerProvider, @JacksonInject AuthorizerMapper authorizerMapper, - @JacksonInject RowIngestionMetersFactory rowIngestionMetersFactory + @JacksonInject RowIngestionMetersFactory rowIngestionMetersFactory, + @JacksonInject ObjectMapper configMapper ) { super( @@ -122,6 +127,7 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler this.tuningConfig = Preconditions.checkNotNull(tuningConfig, "tuningConfig"); this.ioConfig = Preconditions.checkNotNull(ioConfig, "ioConfig"); this.chatHandlerProvider = Optional.fromNullable(chatHandlerProvider); + this.configMapper = configMapper; final CircularBuffer savedParseExceptions; if (tuningConfig.getMaxSavedParseExceptions() > 0) { savedParseExceptions = new CircularBuffer<>(tuningConfig.getMaxSavedParseExceptions()); @@ -198,7 +204,6 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler } - @Override public TaskStatus run(final TaskToolbox toolbox) { @@ -285,9 +290,7 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler final Properties props = new Properties(); - for (Map.Entry entry : ioConfig.getConsumerProperties().entrySet()) { - props.setProperty(entry.getKey(), entry.getValue()); - } + addConsumerPropertiesFromConfig(props, configMapper, ioConfig.getConsumerProperties()); props.setProperty("enable.auto.commit", "false"); props.setProperty("auto.offset.reset", "none"); @@ -301,6 +304,25 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler } } + public static void addConsumerPropertiesFromConfig(Properties properties, ObjectMapper configMapper, Map consumerProperties) + { + // Extract passwords before SSL connection to Kafka + for (Map.Entry entry : consumerProperties.entrySet()) { + String propertyKey = entry.getKey(); + if (propertyKey.equals(KafkaSupervisorIOConfig.TRUST_STORE_PASSWORD_KEY) + || propertyKey.equals(KafkaSupervisorIOConfig.KEY_STORE_PASSWORD_KEY) + || propertyKey.equals(KafkaSupervisorIOConfig.KEY_PASSWORD_KEY)) { + PasswordProvider configPasswordProvider = configMapper.convertValue( + entry.getValue(), + PasswordProvider.class + ); + properties.setProperty(propertyKey, configPasswordProvider.getPassword()); + } else { + properties.setProperty(propertyKey, String.valueOf(entry.getValue())); + } + } + } + static void assignPartitions( final KafkaConsumer consumer, final String topic, diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java index 48086582fc1..b7845cae220 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java @@ -1018,7 +1018,7 @@ public class KafkaSupervisor implements Supervisor props.setProperty("metadata.max.age.ms", "10000"); props.setProperty("group.id", StringUtils.format("kafka-supervisor-%s", RealtimeIndexTask.makeRandomId())); - props.putAll(ioConfig.getConsumerProperties()); + KafkaIndexTask.addConsumerPropertiesFromConfig(props, sortingMapper, ioConfig.getConsumerProperties()); props.setProperty("enable.auto.commit", "false"); @@ -1918,7 +1918,7 @@ public class KafkaSupervisor implements Supervisor } TaskGroup group = taskGroups.get(groupId); - Map consumerProperties = Maps.newHashMap(ioConfig.getConsumerProperties()); + Map consumerProperties = Maps.newHashMap(ioConfig.getConsumerProperties()); DateTime minimumMessageTime = taskGroups.get(groupId).minimumMessageTime.orNull(); DateTime maximumMessageTime = taskGroups.get(groupId).maximumMessageTime.orNull(); @@ -1960,7 +1960,8 @@ public class KafkaSupervisor implements Supervisor context, null, null, - rowIngestionMetersFactory + rowIngestionMetersFactory, + sortingMapper ); Optional taskQueue = taskMaster.getTaskQueue(); diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfig.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfig.java index b02458fd9d4..44c2bb2d6f7 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfig.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfig.java @@ -32,12 +32,15 @@ import java.util.Map; public class KafkaSupervisorIOConfig { public static final String BOOTSTRAP_SERVERS_KEY = "bootstrap.servers"; + public static final String TRUST_STORE_PASSWORD_KEY = "ssl.truststore.password"; + public static final String KEY_STORE_PASSWORD_KEY = "ssl.keystore.password"; + public static final String KEY_PASSWORD_KEY = "ssl.key.password"; private final String topic; private final Integer replicas; private final Integer taskCount; private final Duration taskDuration; - private final Map consumerProperties; + private final Map consumerProperties; private final Duration startDelay; private final Duration period; private final boolean useEarliestOffset; @@ -52,7 +55,7 @@ public class KafkaSupervisorIOConfig @JsonProperty("replicas") Integer replicas, @JsonProperty("taskCount") Integer taskCount, @JsonProperty("taskDuration") Period taskDuration, - @JsonProperty("consumerProperties") Map consumerProperties, + @JsonProperty("consumerProperties") Map consumerProperties, @JsonProperty("startDelay") Period startDelay, @JsonProperty("period") Period period, @JsonProperty("useEarliestOffset") Boolean useEarliestOffset, @@ -110,7 +113,7 @@ public class KafkaSupervisorIOConfig } @JsonProperty - public Map getConsumerProperties() + public Map getConsumerProperties() { return consumerProperties; } diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java index cd44d68b700..6dd210ae4c5 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java @@ -486,7 +486,7 @@ public class KafkaIndexTaskTest kafkaProducer.send(record).get(); } } - Map consumerProps = kafkaServer.consumerProperties(); + Map consumerProps = kafkaServer.consumerProperties(); consumerProps.put("max.poll.records", "1"); final KafkaPartitions startPartitions = new KafkaPartitions(topic, ImmutableMap.of(0, 0L, 1, 0L)); @@ -581,7 +581,7 @@ public class KafkaIndexTaskTest kafkaProducer.send(records.get(i)).get(); } - Map consumerProps = kafkaServer.consumerProperties(); + Map consumerProps = kafkaServer.consumerProperties(); consumerProps.put("max.poll.records", "1"); final KafkaPartitions startPartitions = new KafkaPartitions(topic, ImmutableMap.of(0, 0L, 1, 0L)); @@ -698,7 +698,7 @@ public class KafkaIndexTaskTest kafkaProducer.send(record).get(); } } - Map consumerProps = kafkaServer.consumerProperties(); + Map consumerProps = kafkaServer.consumerProperties(); consumerProps.put("max.poll.records", "1"); final KafkaPartitions startPartitions = new KafkaPartitions(topic, ImmutableMap.of(0, 0L, 1, 0L)); @@ -2027,7 +2027,8 @@ public class KafkaIndexTaskTest context, null, null, - rowIngestionMetersFactory + rowIngestionMetersFactory, + objectMapper ); task.setPollRetryMs(POLL_RETRY_MS); return task; @@ -2073,7 +2074,8 @@ public class KafkaIndexTaskTest context, null, null, - rowIngestionMetersFactory + rowIngestionMetersFactory, + objectMapper ); task.setPollRetryMs(POLL_RETRY_MS); return task; diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfigTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfigTest.java index 5a7df1c9509..a7dc2041b8a 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfigTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfigTest.java @@ -23,6 +23,7 @@ import com.fasterxml.jackson.databind.JsonMappingException; import com.fasterxml.jackson.databind.Module; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableMap; +import org.apache.druid.indexing.kafka.KafkaIndexTask; import org.apache.druid.indexing.kafka.KafkaIndexTaskModule; import org.apache.druid.jackson.DefaultObjectMapper; import org.hamcrest.CoreMatchers; @@ -32,6 +33,8 @@ import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; +import java.util.Properties; + public class KafkaSupervisorIOConfigTest { private final ObjectMapper mapper; @@ -119,6 +122,31 @@ public class KafkaSupervisorIOConfigTest Assert.assertTrue("skipOffsetGaps", config.isSkipOffsetGaps()); } + @Test + public void testSerdeForConsumerPropertiesWithPasswords() throws Exception + { + String jsonStr = "{\n" + + " \"type\": \"kafka\",\n" + + " \"topic\": \"my-topic\",\n" + + " \"consumerProperties\": {\"bootstrap.servers\":\"localhost:9092\",\n" + + " \"ssl.truststore.password\":{\"type\": \"default\", \"password\": \"mytruststorepassword\"},\n" + + " \"ssl.keystore.password\":{\"type\": \"default\", \"password\": \"mykeystorepassword\"},\n" + + " \"ssl.key.password\":\"mykeypassword\"}\n" + + "}"; + + KafkaSupervisorIOConfig config = mapper.readValue( + jsonStr, KafkaSupervisorIOConfig.class + ); + Properties props = new Properties(); + KafkaIndexTask.addConsumerPropertiesFromConfig(props, mapper, config.getConsumerProperties()); + + Assert.assertEquals("my-topic", config.getTopic()); + Assert.assertEquals("localhost:9092", props.getProperty("bootstrap.servers")); + Assert.assertEquals("mytruststorepassword", props.getProperty("ssl.truststore.password")); + Assert.assertEquals("mykeystorepassword", props.getProperty("ssl.keystore.password")); + Assert.assertEquals("mykeypassword", props.getProperty("ssl.key.password")); + } + @Test public void testTopicRequired() throws Exception { diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java index d5b048a239c..c4e24f185cc 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java @@ -2583,7 +2583,7 @@ public class KafkaSupervisorTest extends EasyMockSupport String kafkaHost ) { - Map consumerProperties = new HashMap<>(); + Map consumerProperties = new HashMap<>(); consumerProperties.put("myCustomKey", "myCustomValue"); consumerProperties.put("bootstrap.servers", kafkaHost); KafkaSupervisorIOConfig kafkaSupervisorIOConfig = new KafkaSupervisorIOConfig( @@ -2711,7 +2711,8 @@ public class KafkaSupervisorTest extends EasyMockSupport Collections.emptyMap(), null, null, - rowIngestionMetersFactory + rowIngestionMetersFactory, + objectMapper ); } diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/test/TestBroker.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/test/TestBroker.java index 561276e6ed3..c1a06716a3c 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/test/TestBroker.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/test/TestBroker.java @@ -110,9 +110,9 @@ public class TestBroker implements Closeable return props; } - public Map consumerProperties() + public Map consumerProperties() { - final Map props = Maps.newHashMap(); + final Map props = Maps.newHashMap(); props.put("bootstrap.servers", StringUtils.format("localhost:%d", getPort())); props.put("key.deserializer", ByteArrayDeserializer.class.getName()); props.put("value.deserializer", ByteArrayDeserializer.class.getName());