From 11a8624ef17dc6b7c264ef3ec162485fa21dce69 Mon Sep 17 00:00:00 2001 From: Tom Date: Thu, 8 Feb 2024 22:55:24 -0800 Subject: [PATCH] allow for kafka-emitter to have extra dimensions be set for each event it emits (#15845) * allow for kafka-emitter to have extra dimensions be set for each event it emits * fix checktsyle issue in kafkaemitterconfig * make changes to fix docs, and cleanup copy paste error in #toString() * undo formatting to markdown table * add more branches so test passes * fix checkstyle issue --- .../extensions-contrib/kafka-emitter.md | 2 ++ .../druid/emitter/kafka/KafkaEmitter.java | 21 ++++++++++++++----- .../emitter/kafka/KafkaEmitterConfig.java | 21 +++++++++++++++---- .../emitter/kafka/KafkaEmitterConfigTest.java | 5 ++++- .../druid/emitter/kafka/KafkaEmitterTest.java | 2 +- 5 files changed, 40 insertions(+), 11 deletions(-) diff --git a/docs/development/extensions-contrib/kafka-emitter.md b/docs/development/extensions-contrib/kafka-emitter.md index 40b63ca73af..a3641e89cec 100644 --- a/docs/development/extensions-contrib/kafka-emitter.md +++ b/docs/development/extensions-contrib/kafka-emitter.md @@ -46,6 +46,7 @@ All the configuration parameters for the Kafka emitter are under `druid.emitter. | `druid.emitter.kafka.segmentMetadata.topic` | Kafka topic name for emitter's target to emit segment metadata. If `event.types` contains `segment_metadata`, this field cannot be empty. | no | none | | `druid.emitter.kafka.producer.config` | JSON configuration to set additional properties to Kafka producer. | no | none | | `druid.emitter.kafka.clusterName` | Optional value to specify the name of your Druid cluster. It can help make groups in your monitoring environment. | no | none | +| `druid.emitter.kafka.extra.dimensions` | Optional JSON configuration to specify a map of extra string dimensions for the events emitted. These can help make groups in your monitoring environment. | no | none | ### Example @@ -57,5 +58,6 @@ druid.emitter.kafka.alert.topic=druid-alert druid.emitter.kafka.request.topic=druid-request-logs druid.emitter.kafka.segmentMetadata.topic=druid-segment-metadata druid.emitter.kafka.producer.config={"max.block.ms":10000} +druid.emitter.kafka.extra.dimensions={"region":"us-east-1","environment":"preProd"} ``` diff --git a/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitter.java b/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitter.java index 661543f707c..a0cb1a9afe1 100644 --- a/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitter.java +++ b/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitter.java @@ -198,11 +198,7 @@ public class KafkaEmitter implements Emitter if (event != null) { try { EventMap map = event.toMap(); - if (config.getClusterName() != null) { - map = map.asBuilder() - .put("clusterName", config.getClusterName()) - .build(); - } + map = addExtraDimensionsToEvent(map); String resultJson = jsonMapper.writeValueAsString(map); @@ -239,6 +235,21 @@ public class KafkaEmitter implements Emitter } } + private EventMap addExtraDimensionsToEvent(EventMap map) + { + if (config.getClusterName() != null || config.getExtraDimensions() != null) { + EventMap.Builder eventMapBuilder = map.asBuilder(); + if (config.getClusterName() != null) { + eventMapBuilder.put("clusterName", config.getClusterName()); + } + if (config.getExtraDimensions() != null) { + eventMapBuilder.putAll(config.getExtraDimensions()); + } + map = eventMapBuilder.build(); + } + return map; + } + @Override public void flush() { diff --git a/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitterConfig.java b/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitterConfig.java index c7038079aa4..86f636e853d 100644 --- a/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitterConfig.java +++ b/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitterConfig.java @@ -62,7 +62,7 @@ public class KafkaEmitterConfig public static final Set DEFAULT_EVENT_TYPES = ImmutableSet.of(EventType.ALERTS, EventType.METRICS); @JsonProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG) private final String bootstrapServers; - @Nullable @JsonProperty("event.types") + @Nonnull @JsonProperty("event.types") private final Set eventTypes; @Nullable @JsonProperty("metric.topic") private final String metricTopic; @@ -72,8 +72,10 @@ public class KafkaEmitterConfig private final String requestTopic; @Nullable @JsonProperty("segmentMetadata.topic") private final String segmentMetadataTopic; - @JsonProperty + @Nullable @JsonProperty private final String clusterName; + @Nullable @JsonProperty("extra.dimensions") + private final Map extraDimensions; @JsonProperty("producer.config") private final Map kafkaProducerConfig; @JsonProperty("producer.hiddenProperties") @@ -87,7 +89,8 @@ public class KafkaEmitterConfig @Nullable @JsonProperty("alert.topic") String alertTopic, @Nullable @JsonProperty("request.topic") String requestTopic, @Nullable @JsonProperty("segmentMetadata.topic") String segmentMetadataTopic, - @JsonProperty("clusterName") String clusterName, + @Nullable @JsonProperty("clusterName") String clusterName, + @Nullable @JsonProperty("extra.dimensions") Map extraDimensions, @JsonProperty("producer.config") @Nullable Map kafkaProducerConfig, @JsonProperty("producer.hiddenProperties") @Nullable DynamicConfigProvider kafkaProducerSecrets ) @@ -99,10 +102,12 @@ public class KafkaEmitterConfig this.requestTopic = this.eventTypes.contains(EventType.REQUESTS) ? Preconditions.checkNotNull(requestTopic, "druid.emitter.kafka.request.topic can not be null") : null; this.segmentMetadataTopic = this.eventTypes.contains(EventType.SEGMENT_METADATA) ? Preconditions.checkNotNull(segmentMetadataTopic, "druid.emitter.kafka.segmentMetadata.topic can not be null") : null; this.clusterName = clusterName; + this.extraDimensions = extraDimensions; this.kafkaProducerConfig = kafkaProducerConfig == null ? ImmutableMap.of() : kafkaProducerConfig; this.kafkaProducerSecrets = kafkaProducerSecrets == null ? new MapStringDynamicConfigProvider(ImmutableMap.of()) : kafkaProducerSecrets; } + @Nonnull private Set maybeUpdateEventTypes(Set eventTypes, String requestTopic) { // Unless explicitly overridden, kafka emitter will always emit metrics and alerts @@ -143,12 +148,18 @@ public class KafkaEmitterConfig return alertTopic; } - @JsonProperty + @Nullable @JsonProperty public String getClusterName() { return clusterName; } + @Nullable + public Map getExtraDimensions() + { + return extraDimensions; + } + @Nullable public String getRequestTopic() { @@ -228,6 +239,7 @@ public class KafkaEmitterConfig result = 31 * result + (getRequestTopic() != null ? getRequestTopic().hashCode() : 0); result = 31 * result + (getSegmentMetadataTopic() != null ? getSegmentMetadataTopic().hashCode() : 0); result = 31 * result + (getClusterName() != null ? getClusterName().hashCode() : 0); + result = 31 * result + (getExtraDimensions() != null ? getExtraDimensions().hashCode() : 0); result = 31 * result + getKafkaProducerConfig().hashCode(); result = 31 * result + getKafkaProducerSecrets().getConfig().hashCode(); return result; @@ -244,6 +256,7 @@ public class KafkaEmitterConfig ", request.topic='" + requestTopic + '\'' + ", segmentMetadata.topic='" + segmentMetadataTopic + '\'' + ", clusterName='" + clusterName + '\'' + + ", extra.dimensions='" + extraDimensions + '\'' + ", producer.config=" + kafkaProducerConfig + '\'' + ", producer.hiddenProperties=" + kafkaProducerSecrets + '}'; diff --git a/extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/KafkaEmitterConfigTest.java b/extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/KafkaEmitterConfigTest.java index 603c8e6701b..fb10868d362 100644 --- a/extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/KafkaEmitterConfigTest.java +++ b/extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/KafkaEmitterConfigTest.java @@ -58,6 +58,7 @@ public class KafkaEmitterConfigTest "requestTest", "metadataTest", "clusterNameTest", + ImmutableMap.of("env", "preProd"), ImmutableMap.builder() .put("testKey", "testValue").build(), DEFAULT_PRODUCER_SECRETS @@ -79,6 +80,7 @@ public class KafkaEmitterConfigTest null, "metadataTest", "clusterNameTest", + null, ImmutableMap.builder() .put("testKey", "testValue").build(), DEFAULT_PRODUCER_SECRETS @@ -102,6 +104,7 @@ public class KafkaEmitterConfigTest null, "metadataTest", "clusterNameTest", + null, ImmutableMap.builder() .put("testKey", "testValue").build(), DEFAULT_PRODUCER_SECRETS @@ -117,7 +120,7 @@ public class KafkaEmitterConfigTest { KafkaEmitterConfig kafkaEmitterConfig = new KafkaEmitterConfig("localhost:9092", null, "metricTest", "alertTest", null, "metadataTest", - "clusterNameTest", null, null + "clusterNameTest", null, null, null ); try { @SuppressWarnings("unused") diff --git a/extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/KafkaEmitterTest.java b/extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/KafkaEmitterTest.java index 1c30bee12fa..228982b6cd7 100644 --- a/extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/KafkaEmitterTest.java +++ b/extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/KafkaEmitterTest.java @@ -106,7 +106,7 @@ public class KafkaEmitterTest ObjectMapper mapper = new ObjectMapper(); mapper.registerModule(new JodaModule()); final KafkaEmitter kafkaEmitter = new KafkaEmitter( - new KafkaEmitterConfig("", eventsType, "metrics", "alerts", requestTopic, "metadata", "test-cluster", null, null), + new KafkaEmitterConfig("", eventsType, "metrics", "alerts", requestTopic, "metadata", "test-cluster", ImmutableMap.of("clusterId", "cluster-101"), null, null), mapper ) {