Securing passwords used for SSL connections to Kafka (#6285)

* Secure credentials in consumer properties

* Merge master

* Refactor property population into separate method

* Fix property setter

* Fix tests
This commit is contained in:
Atul Mohan 2018-10-11 12:03:01 -05:00 committed by Himanshu
parent f8f4526b16
commit ab7b4798cc
9 changed files with 81 additions and 24 deletions

View File

@ -166,7 +166,7 @@ For Roaring bitmaps:
|Field|Type|Description|Required|
|-----|----|-----------|--------|
|`topic`|String|The Kafka topic to read from. This must be a specific topic as topic patterns are not supported.|yes|
|`consumerProperties`|Map<String, String>|A map of properties to be passed to the Kafka consumer. This must contain a property `bootstrap.servers` with a list of Kafka brokers in the form: `<BROKER_1>:<PORT_1>,<BROKER_2>:<PORT_2>,...`.|yes|
|`consumerProperties`|Map<String, Object>|A map of properties to be passed to the Kafka consumer. This must contain a property `bootstrap.servers` with a list of Kafka brokers in the form: `<BROKER_1>:<PORT_1>,<BROKER_2>:<PORT_2>,...`. For SSL connections, the `keystore`, `truststore` and `key` passwords can be provided as a [Password Provider](../../operations/password-provider.html) or String password.|yes|
|`replicas`|Integer|The number of replica sets, where 1 means a single set of tasks (no replication). Replica tasks will always be assigned to different workers to provide resiliency against node failure.|no (default == 1)|
|`taskCount`|Integer|The maximum number of *reading* tasks in a *replica set*. This means that the maximum number of reading tasks will be `taskCount * replicas` and the total number of tasks (*reading* + *publishing*) will be higher than this. See 'Capacity Planning' below for more details. The number of reading tasks will be less than `taskCount` if `taskCount > {numKafkaPartitions}`.|no (default == 1)|
|`taskDuration`|ISO8601 Period|The length of time before tasks stop reading and begin publishing their segment.|no (default == PT1H)|

View File

@ -39,7 +39,7 @@ public class KafkaIOConfig implements IOConfig
private final String baseSequenceName;
private final KafkaPartitions startPartitions;
private final KafkaPartitions endPartitions;
private final Map<String, String> consumerProperties;
private final Map<String, Object> consumerProperties;
private final boolean useTransaction;
private final Optional<DateTime> minimumMessageTime;
private final Optional<DateTime> maximumMessageTime;
@ -51,7 +51,7 @@ public class KafkaIOConfig implements IOConfig
@JsonProperty("baseSequenceName") String baseSequenceName,
@JsonProperty("startPartitions") KafkaPartitions startPartitions,
@JsonProperty("endPartitions") KafkaPartitions endPartitions,
@JsonProperty("consumerProperties") Map<String, String> consumerProperties,
@JsonProperty("consumerProperties") Map<String, Object> consumerProperties,
@JsonProperty("useTransaction") Boolean useTransaction,
@JsonProperty("minimumMessageTime") DateTime minimumMessageTime,
@JsonProperty("maximumMessageTime") DateTime maximumMessageTime,
@ -114,7 +114,7 @@ public class KafkaIOConfig implements IOConfig
}
@JsonProperty
public Map<String, String> getConsumerProperties()
public Map<String, Object> getConsumerProperties()
{
return consumerProperties;
}

View File

@ -22,6 +22,7 @@ package org.apache.druid.indexing.kafka;
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
import com.google.common.base.Optional;
@ -41,10 +42,12 @@ import org.apache.druid.indexing.common.task.RealtimeIndexTask;
import org.apache.druid.indexing.common.task.TaskResource;
import org.apache.druid.indexing.common.task.Tasks;
import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisor;
import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorIOConfig;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.parsers.ParseException;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.metadata.PasswordProvider;
import org.apache.druid.query.NoopQueryRunner;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryRunner;
@ -92,6 +95,7 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler
private final KafkaIOConfig ioConfig;
private final Optional<ChatHandlerProvider> chatHandlerProvider;
private final KafkaIndexTaskRunner runner;
private final ObjectMapper configMapper;
// This value can be tuned in some tests
private long pollRetryMs = 30000;
@ -106,7 +110,8 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler
@JsonProperty("context") Map<String, Object> context,
@JacksonInject ChatHandlerProvider chatHandlerProvider,
@JacksonInject AuthorizerMapper authorizerMapper,
@JacksonInject RowIngestionMetersFactory rowIngestionMetersFactory
@JacksonInject RowIngestionMetersFactory rowIngestionMetersFactory,
@JacksonInject ObjectMapper configMapper
)
{
super(
@ -122,6 +127,7 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler
this.tuningConfig = Preconditions.checkNotNull(tuningConfig, "tuningConfig");
this.ioConfig = Preconditions.checkNotNull(ioConfig, "ioConfig");
this.chatHandlerProvider = Optional.fromNullable(chatHandlerProvider);
this.configMapper = configMapper;
final CircularBuffer<Throwable> savedParseExceptions;
if (tuningConfig.getMaxSavedParseExceptions() > 0) {
savedParseExceptions = new CircularBuffer<>(tuningConfig.getMaxSavedParseExceptions());
@ -198,7 +204,6 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler
}
@Override
public TaskStatus run(final TaskToolbox toolbox)
{
@ -285,9 +290,7 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler
final Properties props = new Properties();
for (Map.Entry<String, String> entry : ioConfig.getConsumerProperties().entrySet()) {
props.setProperty(entry.getKey(), entry.getValue());
}
addConsumerPropertiesFromConfig(props, configMapper, ioConfig.getConsumerProperties());
props.setProperty("enable.auto.commit", "false");
props.setProperty("auto.offset.reset", "none");
@ -301,6 +304,25 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler
}
}
public static void addConsumerPropertiesFromConfig(Properties properties, ObjectMapper configMapper, Map<String, Object> consumerProperties)
{
// Extract passwords before SSL connection to Kafka
for (Map.Entry<String, Object> entry : consumerProperties.entrySet()) {
String propertyKey = entry.getKey();
if (propertyKey.equals(KafkaSupervisorIOConfig.TRUST_STORE_PASSWORD_KEY)
|| propertyKey.equals(KafkaSupervisorIOConfig.KEY_STORE_PASSWORD_KEY)
|| propertyKey.equals(KafkaSupervisorIOConfig.KEY_PASSWORD_KEY)) {
PasswordProvider configPasswordProvider = configMapper.convertValue(
entry.getValue(),
PasswordProvider.class
);
properties.setProperty(propertyKey, configPasswordProvider.getPassword());
} else {
properties.setProperty(propertyKey, String.valueOf(entry.getValue()));
}
}
}
static void assignPartitions(
final KafkaConsumer consumer,
final String topic,

View File

@ -1018,7 +1018,7 @@ public class KafkaSupervisor implements Supervisor
props.setProperty("metadata.max.age.ms", "10000");
props.setProperty("group.id", StringUtils.format("kafka-supervisor-%s", RealtimeIndexTask.makeRandomId()));
props.putAll(ioConfig.getConsumerProperties());
KafkaIndexTask.addConsumerPropertiesFromConfig(props, sortingMapper, ioConfig.getConsumerProperties());
props.setProperty("enable.auto.commit", "false");
@ -1918,7 +1918,7 @@ public class KafkaSupervisor implements Supervisor
}
TaskGroup group = taskGroups.get(groupId);
Map<String, String> consumerProperties = Maps.newHashMap(ioConfig.getConsumerProperties());
Map<String, Object> consumerProperties = Maps.newHashMap(ioConfig.getConsumerProperties());
DateTime minimumMessageTime = taskGroups.get(groupId).minimumMessageTime.orNull();
DateTime maximumMessageTime = taskGroups.get(groupId).maximumMessageTime.orNull();
@ -1960,7 +1960,8 @@ public class KafkaSupervisor implements Supervisor
context,
null,
null,
rowIngestionMetersFactory
rowIngestionMetersFactory,
sortingMapper
);
Optional<TaskQueue> taskQueue = taskMaster.getTaskQueue();

View File

@ -32,12 +32,15 @@ import java.util.Map;
public class KafkaSupervisorIOConfig
{
public static final String BOOTSTRAP_SERVERS_KEY = "bootstrap.servers";
public static final String TRUST_STORE_PASSWORD_KEY = "ssl.truststore.password";
public static final String KEY_STORE_PASSWORD_KEY = "ssl.keystore.password";
public static final String KEY_PASSWORD_KEY = "ssl.key.password";
private final String topic;
private final Integer replicas;
private final Integer taskCount;
private final Duration taskDuration;
private final Map<String, String> consumerProperties;
private final Map<String, Object> consumerProperties;
private final Duration startDelay;
private final Duration period;
private final boolean useEarliestOffset;
@ -52,7 +55,7 @@ public class KafkaSupervisorIOConfig
@JsonProperty("replicas") Integer replicas,
@JsonProperty("taskCount") Integer taskCount,
@JsonProperty("taskDuration") Period taskDuration,
@JsonProperty("consumerProperties") Map<String, String> consumerProperties,
@JsonProperty("consumerProperties") Map<String, Object> consumerProperties,
@JsonProperty("startDelay") Period startDelay,
@JsonProperty("period") Period period,
@JsonProperty("useEarliestOffset") Boolean useEarliestOffset,
@ -110,7 +113,7 @@ public class KafkaSupervisorIOConfig
}
@JsonProperty
public Map<String, String> getConsumerProperties()
public Map<String, Object> getConsumerProperties()
{
return consumerProperties;
}

View File

@ -486,7 +486,7 @@ public class KafkaIndexTaskTest
kafkaProducer.send(record).get();
}
}
Map<String, String> consumerProps = kafkaServer.consumerProperties();
Map<String, Object> consumerProps = kafkaServer.consumerProperties();
consumerProps.put("max.poll.records", "1");
final KafkaPartitions startPartitions = new KafkaPartitions(topic, ImmutableMap.of(0, 0L, 1, 0L));
@ -581,7 +581,7 @@ public class KafkaIndexTaskTest
kafkaProducer.send(records.get(i)).get();
}
Map<String, String> consumerProps = kafkaServer.consumerProperties();
Map<String, Object> consumerProps = kafkaServer.consumerProperties();
consumerProps.put("max.poll.records", "1");
final KafkaPartitions startPartitions = new KafkaPartitions(topic, ImmutableMap.of(0, 0L, 1, 0L));
@ -698,7 +698,7 @@ public class KafkaIndexTaskTest
kafkaProducer.send(record).get();
}
}
Map<String, String> consumerProps = kafkaServer.consumerProperties();
Map<String, Object> consumerProps = kafkaServer.consumerProperties();
consumerProps.put("max.poll.records", "1");
final KafkaPartitions startPartitions = new KafkaPartitions(topic, ImmutableMap.of(0, 0L, 1, 0L));
@ -2027,7 +2027,8 @@ public class KafkaIndexTaskTest
context,
null,
null,
rowIngestionMetersFactory
rowIngestionMetersFactory,
objectMapper
);
task.setPollRetryMs(POLL_RETRY_MS);
return task;
@ -2073,7 +2074,8 @@ public class KafkaIndexTaskTest
context,
null,
null,
rowIngestionMetersFactory
rowIngestionMetersFactory,
objectMapper
);
task.setPollRetryMs(POLL_RETRY_MS);
return task;

View File

@ -23,6 +23,7 @@ import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.indexing.kafka.KafkaIndexTask;
import org.apache.druid.indexing.kafka.KafkaIndexTaskModule;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.hamcrest.CoreMatchers;
@ -32,6 +33,8 @@ import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import java.util.Properties;
public class KafkaSupervisorIOConfigTest
{
private final ObjectMapper mapper;
@ -119,6 +122,31 @@ public class KafkaSupervisorIOConfigTest
Assert.assertTrue("skipOffsetGaps", config.isSkipOffsetGaps());
}
@Test
public void testSerdeForConsumerPropertiesWithPasswords() throws Exception
{
String jsonStr = "{\n"
+ " \"type\": \"kafka\",\n"
+ " \"topic\": \"my-topic\",\n"
+ " \"consumerProperties\": {\"bootstrap.servers\":\"localhost:9092\",\n"
+ " \"ssl.truststore.password\":{\"type\": \"default\", \"password\": \"mytruststorepassword\"},\n"
+ " \"ssl.keystore.password\":{\"type\": \"default\", \"password\": \"mykeystorepassword\"},\n"
+ " \"ssl.key.password\":\"mykeypassword\"}\n"
+ "}";
KafkaSupervisorIOConfig config = mapper.readValue(
jsonStr, KafkaSupervisorIOConfig.class
);
Properties props = new Properties();
KafkaIndexTask.addConsumerPropertiesFromConfig(props, mapper, config.getConsumerProperties());
Assert.assertEquals("my-topic", config.getTopic());
Assert.assertEquals("localhost:9092", props.getProperty("bootstrap.servers"));
Assert.assertEquals("mytruststorepassword", props.getProperty("ssl.truststore.password"));
Assert.assertEquals("mykeystorepassword", props.getProperty("ssl.keystore.password"));
Assert.assertEquals("mykeypassword", props.getProperty("ssl.key.password"));
}
@Test
public void testTopicRequired() throws Exception
{

View File

@ -2583,7 +2583,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
String kafkaHost
)
{
Map<String, String> consumerProperties = new HashMap<>();
Map<String, Object> consumerProperties = new HashMap<>();
consumerProperties.put("myCustomKey", "myCustomValue");
consumerProperties.put("bootstrap.servers", kafkaHost);
KafkaSupervisorIOConfig kafkaSupervisorIOConfig = new KafkaSupervisorIOConfig(
@ -2711,7 +2711,8 @@ public class KafkaSupervisorTest extends EasyMockSupport
Collections.emptyMap(),
null,
null,
rowIngestionMetersFactory
rowIngestionMetersFactory,
objectMapper
);
}

View File

@ -110,9 +110,9 @@ public class TestBroker implements Closeable
return props;
}
public Map<String, String> consumerProperties()
public Map<String, Object> consumerProperties()
{
final Map<String, String> props = Maps.newHashMap();
final Map<String, Object> props = Maps.newHashMap();
props.put("bootstrap.servers", StringUtils.format("localhost:%d", getPort()));
props.put("key.deserializer", ByteArrayDeserializer.class.getName());
props.put("value.deserializer", ByteArrayDeserializer.class.getName());