mirror of
https://github.com/apache/druid.git
synced 2025-02-16 23:15:16 +00:00
Allow for kafka emitter producer secrets to be masked in logs (#15485)
* Allow for kafka emitter producer secrets to be masked in logs instead of being visible This change will allow for kafka producer config values that should be secrets to not show up in the logs. This will enhance the security of the people who use the kafka emitter to use this if they want to. This is opt in and will not affect prior configs for this emitter * fix checkstyle issue * change property name
This commit is contained in:
parent
da6b3cbc51
commit
901ebbb744
@ -115,6 +115,7 @@ public class KafkaEmitter implements Emitter
|
|||||||
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
|
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
|
||||||
props.put(ProducerConfig.RETRIES_CONFIG, DEFAULT_RETRIES);
|
props.put(ProducerConfig.RETRIES_CONFIG, DEFAULT_RETRIES);
|
||||||
props.putAll(config.getKafkaProducerConfig());
|
props.putAll(config.getKafkaProducerConfig());
|
||||||
|
props.putAll(config.getKafkaProducerSecrets().getConfig());
|
||||||
|
|
||||||
return new KafkaProducer<>(props);
|
return new KafkaProducer<>(props);
|
||||||
}
|
}
|
||||||
|
@ -26,6 +26,8 @@ import com.google.common.base.Preconditions;
|
|||||||
import com.google.common.collect.ImmutableMap;
|
import com.google.common.collect.ImmutableMap;
|
||||||
import com.google.common.collect.ImmutableSet;
|
import com.google.common.collect.ImmutableSet;
|
||||||
import org.apache.druid.java.util.common.StringUtils;
|
import org.apache.druid.java.util.common.StringUtils;
|
||||||
|
import org.apache.druid.metadata.DynamicConfigProvider;
|
||||||
|
import org.apache.druid.metadata.MapStringDynamicConfigProvider;
|
||||||
import org.apache.kafka.clients.producer.ProducerConfig;
|
import org.apache.kafka.clients.producer.ProducerConfig;
|
||||||
|
|
||||||
import javax.annotation.Nullable;
|
import javax.annotation.Nullable;
|
||||||
@ -73,6 +75,8 @@ public class KafkaEmitterConfig
|
|||||||
private final String clusterName;
|
private final String clusterName;
|
||||||
@JsonProperty("producer.config")
|
@JsonProperty("producer.config")
|
||||||
private final Map<String, String> kafkaProducerConfig;
|
private final Map<String, String> kafkaProducerConfig;
|
||||||
|
@JsonProperty("producer.hiddenProperties")
|
||||||
|
private final DynamicConfigProvider<String> kafkaProducerSecrets;
|
||||||
|
|
||||||
@JsonCreator
|
@JsonCreator
|
||||||
public KafkaEmitterConfig(
|
public KafkaEmitterConfig(
|
||||||
@ -83,7 +87,8 @@ public class KafkaEmitterConfig
|
|||||||
@Nullable @JsonProperty("request.topic") String requestTopic,
|
@Nullable @JsonProperty("request.topic") String requestTopic,
|
||||||
@Nullable @JsonProperty("segmentMetadata.topic") String segmentMetadataTopic,
|
@Nullable @JsonProperty("segmentMetadata.topic") String segmentMetadataTopic,
|
||||||
@JsonProperty("clusterName") String clusterName,
|
@JsonProperty("clusterName") String clusterName,
|
||||||
@JsonProperty("producer.config") @Nullable Map<String, String> kafkaProducerConfig
|
@JsonProperty("producer.config") @Nullable Map<String, String> kafkaProducerConfig,
|
||||||
|
@JsonProperty("producer.hiddenProperties") @Nullable DynamicConfigProvider<String> kafkaProducerSecrets
|
||||||
)
|
)
|
||||||
{
|
{
|
||||||
this.bootstrapServers = Preconditions.checkNotNull(bootstrapServers, "druid.emitter.kafka.bootstrap.servers can not be null");
|
this.bootstrapServers = Preconditions.checkNotNull(bootstrapServers, "druid.emitter.kafka.bootstrap.servers can not be null");
|
||||||
@ -94,6 +99,7 @@ public class KafkaEmitterConfig
|
|||||||
this.segmentMetadataTopic = this.eventTypes.contains(EventType.SEGMENT_METADATA) ? Preconditions.checkNotNull(segmentMetadataTopic, "druid.emitter.kafka.segmentMetadata.topic can not be null") : null;
|
this.segmentMetadataTopic = this.eventTypes.contains(EventType.SEGMENT_METADATA) ? Preconditions.checkNotNull(segmentMetadataTopic, "druid.emitter.kafka.segmentMetadata.topic can not be null") : null;
|
||||||
this.clusterName = clusterName;
|
this.clusterName = clusterName;
|
||||||
this.kafkaProducerConfig = kafkaProducerConfig == null ? ImmutableMap.of() : kafkaProducerConfig;
|
this.kafkaProducerConfig = kafkaProducerConfig == null ? ImmutableMap.of() : kafkaProducerConfig;
|
||||||
|
this.kafkaProducerSecrets = kafkaProducerSecrets == null ? new MapStringDynamicConfigProvider(ImmutableMap.of()) : kafkaProducerSecrets;
|
||||||
}
|
}
|
||||||
|
|
||||||
private Set<EventType> maybeUpdateEventTypes(Set<EventType> eventTypes, String requestTopic)
|
private Set<EventType> maybeUpdateEventTypes(Set<EventType> eventTypes, String requestTopic)
|
||||||
@ -159,6 +165,12 @@ public class KafkaEmitterConfig
|
|||||||
return kafkaProducerConfig;
|
return kafkaProducerConfig;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@JsonProperty
|
||||||
|
public DynamicConfigProvider<String> getKafkaProducerSecrets()
|
||||||
|
{
|
||||||
|
return kafkaProducerSecrets;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean equals(Object o)
|
public boolean equals(Object o)
|
||||||
{
|
{
|
||||||
@ -198,7 +210,10 @@ public class KafkaEmitterConfig
|
|||||||
if (getClusterName() != null ? !getClusterName().equals(that.getClusterName()) : that.getClusterName() != null) {
|
if (getClusterName() != null ? !getClusterName().equals(that.getClusterName()) : that.getClusterName() != null) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
return getKafkaProducerConfig().equals(that.getKafkaProducerConfig());
|
if (!getKafkaProducerConfig().equals(that.getKafkaProducerConfig())) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
return getKafkaProducerSecrets().getConfig().equals(that.getKafkaProducerSecrets().getConfig());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -212,6 +227,7 @@ public class KafkaEmitterConfig
|
|||||||
result = 31 * result + (getSegmentMetadataTopic() != null ? getSegmentMetadataTopic().hashCode() : 0);
|
result = 31 * result + (getSegmentMetadataTopic() != null ? getSegmentMetadataTopic().hashCode() : 0);
|
||||||
result = 31 * result + (getClusterName() != null ? getClusterName().hashCode() : 0);
|
result = 31 * result + (getClusterName() != null ? getClusterName().hashCode() : 0);
|
||||||
result = 31 * result + getKafkaProducerConfig().hashCode();
|
result = 31 * result + getKafkaProducerConfig().hashCode();
|
||||||
|
result = 31 * result + getKafkaProducerSecrets().getConfig().hashCode();
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -226,7 +242,8 @@ public class KafkaEmitterConfig
|
|||||||
", request.topic='" + requestTopic + '\'' +
|
", request.topic='" + requestTopic + '\'' +
|
||||||
", segmentMetadata.topic='" + segmentMetadataTopic + '\'' +
|
", segmentMetadata.topic='" + segmentMetadataTopic + '\'' +
|
||||||
", clusterName='" + clusterName + '\'' +
|
", clusterName='" + clusterName + '\'' +
|
||||||
", Producer.config=" + kafkaProducerConfig +
|
", producer.config=" + kafkaProducerConfig + '\'' +
|
||||||
|
", producer.hiddenProperties=" + kafkaProducerSecrets +
|
||||||
'}';
|
'}';
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -25,48 +25,67 @@ import com.fasterxml.jackson.databind.ObjectMapper;
|
|||||||
import com.fasterxml.jackson.databind.exc.ValueInstantiationException;
|
import com.fasterxml.jackson.databind.exc.ValueInstantiationException;
|
||||||
import com.google.common.collect.ImmutableMap;
|
import com.google.common.collect.ImmutableMap;
|
||||||
import org.apache.druid.jackson.DefaultObjectMapper;
|
import org.apache.druid.jackson.DefaultObjectMapper;
|
||||||
|
import org.apache.druid.metadata.DynamicConfigProvider;
|
||||||
|
import org.apache.druid.metadata.MapStringDynamicConfigProvider;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
|
||||||
public class KafkaEmitterConfigTest
|
public class KafkaEmitterConfigTest
|
||||||
{
|
{
|
||||||
private ObjectMapper mapper = new DefaultObjectMapper();
|
private static final DynamicConfigProvider<String> DEFAULT_PRODUCER_SECRETS = new MapStringDynamicConfigProvider(
|
||||||
|
ImmutableMap.of("testSecretKey", "testSecretValue"));
|
||||||
|
private static final ObjectMapper MAPPER = new DefaultObjectMapper();
|
||||||
|
|
||||||
@Before
|
@Before
|
||||||
public void setUp()
|
public void setUp()
|
||||||
{
|
{
|
||||||
mapper.setInjectableValues(new InjectableValues.Std().addValue(ObjectMapper.class, new DefaultObjectMapper()));
|
MAPPER.setInjectableValues(new InjectableValues.Std().addValue(ObjectMapper.class, new DefaultObjectMapper()));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testSerDeserKafkaEmitterConfig() throws IOException
|
public void testSerDeserKafkaEmitterConfig() throws IOException
|
||||||
{
|
{
|
||||||
KafkaEmitterConfig kafkaEmitterConfig = new KafkaEmitterConfig("hostname", null, "metricTest",
|
KafkaEmitterConfig kafkaEmitterConfig = new KafkaEmitterConfig(
|
||||||
"alertTest", "requestTest", "metadataTest",
|
"hostname",
|
||||||
"clusterNameTest", ImmutableMap.<String, String>builder()
|
null,
|
||||||
.put("testKey", "testValue").build()
|
"metricTest",
|
||||||
|
"alertTest",
|
||||||
|
"requestTest",
|
||||||
|
"metadataTest",
|
||||||
|
"clusterNameTest",
|
||||||
|
ImmutableMap.<String, String>builder()
|
||||||
|
.put("testKey", "testValue").build(),
|
||||||
|
DEFAULT_PRODUCER_SECRETS
|
||||||
);
|
);
|
||||||
String kafkaEmitterConfigString = mapper.writeValueAsString(kafkaEmitterConfig);
|
String kafkaEmitterConfigString = MAPPER.writeValueAsString(kafkaEmitterConfig);
|
||||||
KafkaEmitterConfig kafkaEmitterConfigExpected = mapper.readerFor(KafkaEmitterConfig.class)
|
KafkaEmitterConfig kafkaEmitterConfigExpected = MAPPER.readerFor(KafkaEmitterConfig.class)
|
||||||
.readValue(kafkaEmitterConfigString);
|
.readValue(kafkaEmitterConfigString);
|
||||||
Assert.assertEquals(kafkaEmitterConfigExpected, kafkaEmitterConfig);
|
Assert.assertEquals(kafkaEmitterConfigExpected, kafkaEmitterConfig);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testSerDeserKafkaEmitterConfigNullRequestTopic() throws IOException
|
public void testSerDeserKafkaEmitterConfigNullRequestTopic() throws IOException
|
||||||
{
|
{
|
||||||
KafkaEmitterConfig kafkaEmitterConfig = new KafkaEmitterConfig("hostname", null, "metricTest",
|
KafkaEmitterConfig kafkaEmitterConfig = new KafkaEmitterConfig(
|
||||||
"alertTest", null, "metadataTest",
|
"hostname",
|
||||||
"clusterNameTest", ImmutableMap.<String, String>builder()
|
null,
|
||||||
.put("testKey", "testValue").build()
|
"metricTest",
|
||||||
|
"alertTest",
|
||||||
|
null,
|
||||||
|
"metadataTest",
|
||||||
|
"clusterNameTest",
|
||||||
|
ImmutableMap.<String, String>builder()
|
||||||
|
.put("testKey", "testValue").build(),
|
||||||
|
DEFAULT_PRODUCER_SECRETS
|
||||||
);
|
);
|
||||||
String kafkaEmitterConfigString = mapper.writeValueAsString(kafkaEmitterConfig);
|
String kafkaEmitterConfigString = MAPPER.writeValueAsString(kafkaEmitterConfig);
|
||||||
KafkaEmitterConfig kafkaEmitterConfigExpected = mapper.readerFor(KafkaEmitterConfig.class)
|
KafkaEmitterConfig kafkaEmitterConfigExpected = MAPPER.readerFor(KafkaEmitterConfig.class)
|
||||||
.readValue(kafkaEmitterConfigString);
|
.readValue(kafkaEmitterConfigString);
|
||||||
Assert.assertEquals(kafkaEmitterConfigExpected, kafkaEmitterConfig);
|
Assert.assertEquals(kafkaEmitterConfigExpected, kafkaEmitterConfig);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -75,27 +94,34 @@ public class KafkaEmitterConfigTest
|
|||||||
{
|
{
|
||||||
Set<KafkaEmitterConfig.EventType> eventTypeSet = new HashSet<KafkaEmitterConfig.EventType>();
|
Set<KafkaEmitterConfig.EventType> eventTypeSet = new HashSet<KafkaEmitterConfig.EventType>();
|
||||||
eventTypeSet.add(KafkaEmitterConfig.EventType.SEGMENT_METADATA);
|
eventTypeSet.add(KafkaEmitterConfig.EventType.SEGMENT_METADATA);
|
||||||
KafkaEmitterConfig kafkaEmitterConfig = new KafkaEmitterConfig("hostname", eventTypeSet, null,
|
KafkaEmitterConfig kafkaEmitterConfig = new KafkaEmitterConfig(
|
||||||
null, null, "metadataTest",
|
"hostname",
|
||||||
"clusterNameTest", ImmutableMap.<String, String>builder()
|
eventTypeSet,
|
||||||
.put("testKey", "testValue").build()
|
null,
|
||||||
|
null,
|
||||||
|
null,
|
||||||
|
"metadataTest",
|
||||||
|
"clusterNameTest",
|
||||||
|
ImmutableMap.<String, String>builder()
|
||||||
|
.put("testKey", "testValue").build(),
|
||||||
|
DEFAULT_PRODUCER_SECRETS
|
||||||
);
|
);
|
||||||
String kafkaEmitterConfigString = mapper.writeValueAsString(kafkaEmitterConfig);
|
String kafkaEmitterConfigString = MAPPER.writeValueAsString(kafkaEmitterConfig);
|
||||||
KafkaEmitterConfig kafkaEmitterConfigExpected = mapper.readerFor(KafkaEmitterConfig.class)
|
KafkaEmitterConfig kafkaEmitterConfigExpected = MAPPER.readerFor(KafkaEmitterConfig.class)
|
||||||
.readValue(kafkaEmitterConfigString);
|
.readValue(kafkaEmitterConfigString);
|
||||||
Assert.assertEquals(kafkaEmitterConfigExpected, kafkaEmitterConfig);
|
Assert.assertEquals(kafkaEmitterConfigExpected, kafkaEmitterConfig);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testSerDeNotRequiredKafkaProducerConfig()
|
public void testSerDeNotRequiredKafkaProducerConfigOrKafkaSecretProducer()
|
||||||
{
|
{
|
||||||
KafkaEmitterConfig kafkaEmitterConfig = new KafkaEmitterConfig("localhost:9092", null, "metricTest",
|
KafkaEmitterConfig kafkaEmitterConfig = new KafkaEmitterConfig("localhost:9092", null, "metricTest",
|
||||||
"alertTest", null, "metadataTest",
|
"alertTest", null, "metadataTest",
|
||||||
"clusterNameTest", null
|
"clusterNameTest", null, null
|
||||||
);
|
);
|
||||||
try {
|
try {
|
||||||
@SuppressWarnings("unused")
|
@SuppressWarnings("unused")
|
||||||
KafkaEmitter emitter = new KafkaEmitter(kafkaEmitterConfig, mapper);
|
KafkaEmitter emitter = new KafkaEmitter(kafkaEmitterConfig, MAPPER);
|
||||||
}
|
}
|
||||||
catch (NullPointerException e) {
|
catch (NullPointerException e) {
|
||||||
Assert.fail();
|
Assert.fail();
|
||||||
@ -105,9 +131,18 @@ public class KafkaEmitterConfigTest
|
|||||||
@Test
|
@Test
|
||||||
public void testDeserializeEventTypesWithDifferentCase() throws JsonProcessingException
|
public void testDeserializeEventTypesWithDifferentCase() throws JsonProcessingException
|
||||||
{
|
{
|
||||||
Assert.assertEquals(KafkaEmitterConfig.EventType.SEGMENT_METADATA, mapper.readValue("\"segment_metadata\"", KafkaEmitterConfig.EventType.class));
|
Assert.assertEquals(
|
||||||
Assert.assertEquals(KafkaEmitterConfig.EventType.ALERTS, mapper.readValue("\"alerts\"", KafkaEmitterConfig.EventType.class));
|
KafkaEmitterConfig.EventType.SEGMENT_METADATA,
|
||||||
Assert.assertThrows(ValueInstantiationException.class, () -> mapper.readValue("\"segmentMetadata\"", KafkaEmitterConfig.EventType.class));
|
MAPPER.readValue("\"segment_metadata\"", KafkaEmitterConfig.EventType.class)
|
||||||
|
);
|
||||||
|
Assert.assertEquals(
|
||||||
|
KafkaEmitterConfig.EventType.ALERTS,
|
||||||
|
MAPPER.readValue("\"alerts\"", KafkaEmitterConfig.EventType.class)
|
||||||
|
);
|
||||||
|
Assert.assertThrows(
|
||||||
|
ValueInstantiationException.class,
|
||||||
|
() -> MAPPER.readValue("\"segmentMetadata\"", KafkaEmitterConfig.EventType.class)
|
||||||
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -106,7 +106,7 @@ public class KafkaEmitterTest
|
|||||||
ObjectMapper mapper = new ObjectMapper();
|
ObjectMapper mapper = new ObjectMapper();
|
||||||
mapper.registerModule(new JodaModule());
|
mapper.registerModule(new JodaModule());
|
||||||
final KafkaEmitter kafkaEmitter = new KafkaEmitter(
|
final KafkaEmitter kafkaEmitter = new KafkaEmitter(
|
||||||
new KafkaEmitterConfig("", eventsType, "metrics", "alerts", requestTopic, "metadata", "test-cluster", null),
|
new KafkaEmitterConfig("", eventsType, "metrics", "alerts", requestTopic, "metadata", "test-cluster", null, null),
|
||||||
mapper
|
mapper
|
||||||
)
|
)
|
||||||
{
|
{
|
||||||
|
@ -38,9 +38,7 @@ public class DynamicConfigProviderUtils
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
Map<String, String> dynamicConfig = extraConfigFromProvider(config.get(dynamicConfigProviderKey), mapper);
|
Map<String, String> dynamicConfig = extraConfigFromProvider(config.get(dynamicConfigProviderKey), mapper);
|
||||||
for (Map.Entry<String, String> entry : dynamicConfig.entrySet()) {
|
newConfig.putAll(dynamicConfig);
|
||||||
newConfig.put(entry.getKey(), entry.getValue());
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
return newConfig;
|
return newConfig;
|
||||||
}
|
}
|
||||||
@ -55,9 +53,7 @@ public class DynamicConfigProviderUtils
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
Map<String, String> dynamicConfig = extraConfigFromProvider(config.get(dynamicConfigProviderKey), mapper);
|
Map<String, String> dynamicConfig = extraConfigFromProvider(config.get(dynamicConfigProviderKey), mapper);
|
||||||
for (Map.Entry<String, String> entry : dynamicConfig.entrySet()) {
|
newConfig.putAll(dynamicConfig);
|
||||||
newConfig.put(entry.getKey(), entry.getValue());
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
return newConfig;
|
return newConfig;
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user