mirror of https://github.com/apache/druid.git
allow for kafka-emitter to have extra dimensions be set for each event it emits (#15845)
* allow for kafka-emitter to have extra dimensions be set for each event it emits * fix checktsyle issue in kafkaemitterconfig * make changes to fix docs, and cleanup copy paste error in #toString() * undo formatting to markdown table * add more branches so test passes * fix checkstyle issue
This commit is contained in:
parent
d703b2c709
commit
11a8624ef1
|
@ -46,6 +46,7 @@ All the configuration parameters for the Kafka emitter are under `druid.emitter.
|
||||||
| `druid.emitter.kafka.segmentMetadata.topic` | Kafka topic name for emitter's target to emit segment metadata. If `event.types` contains `segment_metadata`, this field cannot be empty. | no | none |
|
| `druid.emitter.kafka.segmentMetadata.topic` | Kafka topic name for emitter's target to emit segment metadata. If `event.types` contains `segment_metadata`, this field cannot be empty. | no | none |
|
||||||
| `druid.emitter.kafka.producer.config` | JSON configuration to set additional properties to Kafka producer. | no | none |
|
| `druid.emitter.kafka.producer.config` | JSON configuration to set additional properties to Kafka producer. | no | none |
|
||||||
| `druid.emitter.kafka.clusterName` | Optional value to specify the name of your Druid cluster. It can help make groups in your monitoring environment. | no | none |
|
| `druid.emitter.kafka.clusterName` | Optional value to specify the name of your Druid cluster. It can help make groups in your monitoring environment. | no | none |
|
||||||
|
| `druid.emitter.kafka.extra.dimensions` | Optional JSON configuration to specify a map of extra string dimensions for the events emitted. These can help make groups in your monitoring environment. | no | none |
|
||||||
|
|
||||||
### Example
|
### Example
|
||||||
|
|
||||||
|
@ -57,5 +58,6 @@ druid.emitter.kafka.alert.topic=druid-alert
|
||||||
druid.emitter.kafka.request.topic=druid-request-logs
|
druid.emitter.kafka.request.topic=druid-request-logs
|
||||||
druid.emitter.kafka.segmentMetadata.topic=druid-segment-metadata
|
druid.emitter.kafka.segmentMetadata.topic=druid-segment-metadata
|
||||||
druid.emitter.kafka.producer.config={"max.block.ms":10000}
|
druid.emitter.kafka.producer.config={"max.block.ms":10000}
|
||||||
|
druid.emitter.kafka.extra.dimensions={"region":"us-east-1","environment":"preProd"}
|
||||||
```
|
```
|
||||||
|
|
||||||
|
|
|
@ -198,11 +198,7 @@ public class KafkaEmitter implements Emitter
|
||||||
if (event != null) {
|
if (event != null) {
|
||||||
try {
|
try {
|
||||||
EventMap map = event.toMap();
|
EventMap map = event.toMap();
|
||||||
if (config.getClusterName() != null) {
|
map = addExtraDimensionsToEvent(map);
|
||||||
map = map.asBuilder()
|
|
||||||
.put("clusterName", config.getClusterName())
|
|
||||||
.build();
|
|
||||||
}
|
|
||||||
|
|
||||||
String resultJson = jsonMapper.writeValueAsString(map);
|
String resultJson = jsonMapper.writeValueAsString(map);
|
||||||
|
|
||||||
|
@ -239,6 +235,21 @@ public class KafkaEmitter implements Emitter
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private EventMap addExtraDimensionsToEvent(EventMap map)
|
||||||
|
{
|
||||||
|
if (config.getClusterName() != null || config.getExtraDimensions() != null) {
|
||||||
|
EventMap.Builder eventMapBuilder = map.asBuilder();
|
||||||
|
if (config.getClusterName() != null) {
|
||||||
|
eventMapBuilder.put("clusterName", config.getClusterName());
|
||||||
|
}
|
||||||
|
if (config.getExtraDimensions() != null) {
|
||||||
|
eventMapBuilder.putAll(config.getExtraDimensions());
|
||||||
|
}
|
||||||
|
map = eventMapBuilder.build();
|
||||||
|
}
|
||||||
|
return map;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void flush()
|
public void flush()
|
||||||
{
|
{
|
||||||
|
|
|
@ -62,7 +62,7 @@ public class KafkaEmitterConfig
|
||||||
public static final Set<EventType> DEFAULT_EVENT_TYPES = ImmutableSet.of(EventType.ALERTS, EventType.METRICS);
|
public static final Set<EventType> DEFAULT_EVENT_TYPES = ImmutableSet.of(EventType.ALERTS, EventType.METRICS);
|
||||||
@JsonProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)
|
@JsonProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)
|
||||||
private final String bootstrapServers;
|
private final String bootstrapServers;
|
||||||
@Nullable @JsonProperty("event.types")
|
@Nonnull @JsonProperty("event.types")
|
||||||
private final Set<EventType> eventTypes;
|
private final Set<EventType> eventTypes;
|
||||||
@Nullable @JsonProperty("metric.topic")
|
@Nullable @JsonProperty("metric.topic")
|
||||||
private final String metricTopic;
|
private final String metricTopic;
|
||||||
|
@ -72,8 +72,10 @@ public class KafkaEmitterConfig
|
||||||
private final String requestTopic;
|
private final String requestTopic;
|
||||||
@Nullable @JsonProperty("segmentMetadata.topic")
|
@Nullable @JsonProperty("segmentMetadata.topic")
|
||||||
private final String segmentMetadataTopic;
|
private final String segmentMetadataTopic;
|
||||||
@JsonProperty
|
@Nullable @JsonProperty
|
||||||
private final String clusterName;
|
private final String clusterName;
|
||||||
|
@Nullable @JsonProperty("extra.dimensions")
|
||||||
|
private final Map<String, String> extraDimensions;
|
||||||
@JsonProperty("producer.config")
|
@JsonProperty("producer.config")
|
||||||
private final Map<String, String> kafkaProducerConfig;
|
private final Map<String, String> kafkaProducerConfig;
|
||||||
@JsonProperty("producer.hiddenProperties")
|
@JsonProperty("producer.hiddenProperties")
|
||||||
|
@ -87,7 +89,8 @@ public class KafkaEmitterConfig
|
||||||
@Nullable @JsonProperty("alert.topic") String alertTopic,
|
@Nullable @JsonProperty("alert.topic") String alertTopic,
|
||||||
@Nullable @JsonProperty("request.topic") String requestTopic,
|
@Nullable @JsonProperty("request.topic") String requestTopic,
|
||||||
@Nullable @JsonProperty("segmentMetadata.topic") String segmentMetadataTopic,
|
@Nullable @JsonProperty("segmentMetadata.topic") String segmentMetadataTopic,
|
||||||
@JsonProperty("clusterName") String clusterName,
|
@Nullable @JsonProperty("clusterName") String clusterName,
|
||||||
|
@Nullable @JsonProperty("extra.dimensions") Map<String, String> extraDimensions,
|
||||||
@JsonProperty("producer.config") @Nullable Map<String, String> kafkaProducerConfig,
|
@JsonProperty("producer.config") @Nullable Map<String, String> kafkaProducerConfig,
|
||||||
@JsonProperty("producer.hiddenProperties") @Nullable DynamicConfigProvider<String> kafkaProducerSecrets
|
@JsonProperty("producer.hiddenProperties") @Nullable DynamicConfigProvider<String> kafkaProducerSecrets
|
||||||
)
|
)
|
||||||
|
@ -99,10 +102,12 @@ public class KafkaEmitterConfig
|
||||||
this.requestTopic = this.eventTypes.contains(EventType.REQUESTS) ? Preconditions.checkNotNull(requestTopic, "druid.emitter.kafka.request.topic can not be null") : null;
|
this.requestTopic = this.eventTypes.contains(EventType.REQUESTS) ? Preconditions.checkNotNull(requestTopic, "druid.emitter.kafka.request.topic can not be null") : null;
|
||||||
this.segmentMetadataTopic = this.eventTypes.contains(EventType.SEGMENT_METADATA) ? Preconditions.checkNotNull(segmentMetadataTopic, "druid.emitter.kafka.segmentMetadata.topic can not be null") : null;
|
this.segmentMetadataTopic = this.eventTypes.contains(EventType.SEGMENT_METADATA) ? Preconditions.checkNotNull(segmentMetadataTopic, "druid.emitter.kafka.segmentMetadata.topic can not be null") : null;
|
||||||
this.clusterName = clusterName;
|
this.clusterName = clusterName;
|
||||||
|
this.extraDimensions = extraDimensions;
|
||||||
this.kafkaProducerConfig = kafkaProducerConfig == null ? ImmutableMap.of() : kafkaProducerConfig;
|
this.kafkaProducerConfig = kafkaProducerConfig == null ? ImmutableMap.of() : kafkaProducerConfig;
|
||||||
this.kafkaProducerSecrets = kafkaProducerSecrets == null ? new MapStringDynamicConfigProvider(ImmutableMap.of()) : kafkaProducerSecrets;
|
this.kafkaProducerSecrets = kafkaProducerSecrets == null ? new MapStringDynamicConfigProvider(ImmutableMap.of()) : kafkaProducerSecrets;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Nonnull
|
||||||
private Set<EventType> maybeUpdateEventTypes(Set<EventType> eventTypes, String requestTopic)
|
private Set<EventType> maybeUpdateEventTypes(Set<EventType> eventTypes, String requestTopic)
|
||||||
{
|
{
|
||||||
// Unless explicitly overridden, kafka emitter will always emit metrics and alerts
|
// Unless explicitly overridden, kafka emitter will always emit metrics and alerts
|
||||||
|
@ -143,12 +148,18 @@ public class KafkaEmitterConfig
|
||||||
return alertTopic;
|
return alertTopic;
|
||||||
}
|
}
|
||||||
|
|
||||||
@JsonProperty
|
@Nullable @JsonProperty
|
||||||
public String getClusterName()
|
public String getClusterName()
|
||||||
{
|
{
|
||||||
return clusterName;
|
return clusterName;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Nullable
|
||||||
|
public Map<String, String> getExtraDimensions()
|
||||||
|
{
|
||||||
|
return extraDimensions;
|
||||||
|
}
|
||||||
|
|
||||||
@Nullable
|
@Nullable
|
||||||
public String getRequestTopic()
|
public String getRequestTopic()
|
||||||
{
|
{
|
||||||
|
@ -228,6 +239,7 @@ public class KafkaEmitterConfig
|
||||||
result = 31 * result + (getRequestTopic() != null ? getRequestTopic().hashCode() : 0);
|
result = 31 * result + (getRequestTopic() != null ? getRequestTopic().hashCode() : 0);
|
||||||
result = 31 * result + (getSegmentMetadataTopic() != null ? getSegmentMetadataTopic().hashCode() : 0);
|
result = 31 * result + (getSegmentMetadataTopic() != null ? getSegmentMetadataTopic().hashCode() : 0);
|
||||||
result = 31 * result + (getClusterName() != null ? getClusterName().hashCode() : 0);
|
result = 31 * result + (getClusterName() != null ? getClusterName().hashCode() : 0);
|
||||||
|
result = 31 * result + (getExtraDimensions() != null ? getExtraDimensions().hashCode() : 0);
|
||||||
result = 31 * result + getKafkaProducerConfig().hashCode();
|
result = 31 * result + getKafkaProducerConfig().hashCode();
|
||||||
result = 31 * result + getKafkaProducerSecrets().getConfig().hashCode();
|
result = 31 * result + getKafkaProducerSecrets().getConfig().hashCode();
|
||||||
return result;
|
return result;
|
||||||
|
@ -244,6 +256,7 @@ public class KafkaEmitterConfig
|
||||||
", request.topic='" + requestTopic + '\'' +
|
", request.topic='" + requestTopic + '\'' +
|
||||||
", segmentMetadata.topic='" + segmentMetadataTopic + '\'' +
|
", segmentMetadata.topic='" + segmentMetadataTopic + '\'' +
|
||||||
", clusterName='" + clusterName + '\'' +
|
", clusterName='" + clusterName + '\'' +
|
||||||
|
", extra.dimensions='" + extraDimensions + '\'' +
|
||||||
", producer.config=" + kafkaProducerConfig + '\'' +
|
", producer.config=" + kafkaProducerConfig + '\'' +
|
||||||
", producer.hiddenProperties=" + kafkaProducerSecrets +
|
", producer.hiddenProperties=" + kafkaProducerSecrets +
|
||||||
'}';
|
'}';
|
||||||
|
|
|
@ -58,6 +58,7 @@ public class KafkaEmitterConfigTest
|
||||||
"requestTest",
|
"requestTest",
|
||||||
"metadataTest",
|
"metadataTest",
|
||||||
"clusterNameTest",
|
"clusterNameTest",
|
||||||
|
ImmutableMap.of("env", "preProd"),
|
||||||
ImmutableMap.<String, String>builder()
|
ImmutableMap.<String, String>builder()
|
||||||
.put("testKey", "testValue").build(),
|
.put("testKey", "testValue").build(),
|
||||||
DEFAULT_PRODUCER_SECRETS
|
DEFAULT_PRODUCER_SECRETS
|
||||||
|
@ -79,6 +80,7 @@ public class KafkaEmitterConfigTest
|
||||||
null,
|
null,
|
||||||
"metadataTest",
|
"metadataTest",
|
||||||
"clusterNameTest",
|
"clusterNameTest",
|
||||||
|
null,
|
||||||
ImmutableMap.<String, String>builder()
|
ImmutableMap.<String, String>builder()
|
||||||
.put("testKey", "testValue").build(),
|
.put("testKey", "testValue").build(),
|
||||||
DEFAULT_PRODUCER_SECRETS
|
DEFAULT_PRODUCER_SECRETS
|
||||||
|
@ -102,6 +104,7 @@ public class KafkaEmitterConfigTest
|
||||||
null,
|
null,
|
||||||
"metadataTest",
|
"metadataTest",
|
||||||
"clusterNameTest",
|
"clusterNameTest",
|
||||||
|
null,
|
||||||
ImmutableMap.<String, String>builder()
|
ImmutableMap.<String, String>builder()
|
||||||
.put("testKey", "testValue").build(),
|
.put("testKey", "testValue").build(),
|
||||||
DEFAULT_PRODUCER_SECRETS
|
DEFAULT_PRODUCER_SECRETS
|
||||||
|
@ -117,7 +120,7 @@ public class KafkaEmitterConfigTest
|
||||||
{
|
{
|
||||||
KafkaEmitterConfig kafkaEmitterConfig = new KafkaEmitterConfig("localhost:9092", null, "metricTest",
|
KafkaEmitterConfig kafkaEmitterConfig = new KafkaEmitterConfig("localhost:9092", null, "metricTest",
|
||||||
"alertTest", null, "metadataTest",
|
"alertTest", null, "metadataTest",
|
||||||
"clusterNameTest", null, null
|
"clusterNameTest", null, null, null
|
||||||
);
|
);
|
||||||
try {
|
try {
|
||||||
@SuppressWarnings("unused")
|
@SuppressWarnings("unused")
|
||||||
|
|
|
@ -106,7 +106,7 @@ public class KafkaEmitterTest
|
||||||
ObjectMapper mapper = new ObjectMapper();
|
ObjectMapper mapper = new ObjectMapper();
|
||||||
mapper.registerModule(new JodaModule());
|
mapper.registerModule(new JodaModule());
|
||||||
final KafkaEmitter kafkaEmitter = new KafkaEmitter(
|
final KafkaEmitter kafkaEmitter = new KafkaEmitter(
|
||||||
new KafkaEmitterConfig("", eventsType, "metrics", "alerts", requestTopic, "metadata", "test-cluster", null, null),
|
new KafkaEmitterConfig("", eventsType, "metrics", "alerts", requestTopic, "metadata", "test-cluster", ImmutableMap.of("clusterId", "cluster-101"), null, null),
|
||||||
mapper
|
mapper
|
||||||
)
|
)
|
||||||
{
|
{
|
||||||
|
|
Loading…
Reference in New Issue