Kafka emitter wasn't given the correct number of threads. It should be 1 thread per scheduled task. (#15719)

This change intelligently provisions the correct number of threads per scheduled task. 1 for each event type, and 1 for logging the lost events.
This is a change to make this work. But in the future it would be worthwhile to make each task not be greedy and share threads so there isn't a need of a thread per task.
This commit is contained in:
Tom 2024-01-18 13:27:40 -08:00 committed by GitHub
parent c8c04f1032
commit 18d42cae3f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 5 additions and 1 deletions

View File

@ -84,7 +84,9 @@ public class KafkaEmitter implements Emitter
this.alertQueue = new MemoryBoundLinkedBlockingQueue<>(queueMemoryBound);
this.requestQueue = new MemoryBoundLinkedBlockingQueue<>(queueMemoryBound);
this.segmentMetadataQueue = new MemoryBoundLinkedBlockingQueue<>(queueMemoryBound);
this.scheduler = Executors.newScheduledThreadPool(4);
// need one thread per scheduled task. Scheduled tasks are per eventType and 1 for reporting the lost events
int numOfThreads = config.getEventTypes().size() + 1;
this.scheduler = Executors.newScheduledThreadPool(numOfThreads);
this.metricLost = new AtomicLong(0L);
this.alertLost = new AtomicLong(0L);
this.requestLost = new AtomicLong(0L);

View File

@ -30,6 +30,7 @@ import org.apache.druid.metadata.DynamicConfigProvider;
import org.apache.druid.metadata.MapStringDynamicConfigProvider;
import org.apache.kafka.clients.producer.ProducerConfig;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.util.HashSet;
import java.util.Map;
@ -124,6 +125,7 @@ public class KafkaEmitterConfig
}
@JsonProperty
@Nonnull
public Set<EventType> getEventTypes()
{
return eventTypes;