Kafka lag emitter - Kafka Indexing Service (#4194)

* Kafka lag emitter

* enforce minimum emit period to a minute

* fixed comment
This commit is contained in:
Parag Jain 2017-05-02 18:30:07 -05:00 committed by David Lim
parent 5e85fcc0f5
commit f9a61ea2ba
3 changed files with 148 additions and 3 deletions

View File

@ -40,6 +40,9 @@ import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.metamx.emitter.EmittingLogger;
import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.emitter.service.ServiceMetricEvent;
import com.metamx.metrics.MonitorSchedulerConfig;
import io.druid.concurrent.Execs;
import io.druid.indexing.common.TaskInfoProvider;
import io.druid.indexing.common.TaskLocation;
@ -93,6 +96,7 @@ import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.BiFunction;
/**
* Supervisor responsible for managing the KafkaIndexTasks for a single dataSource. At a high level, the class accepts a
@ -180,6 +184,8 @@ public class KafkaSupervisor implements Supervisor
private final KafkaIndexTaskClient taskClient;
private final ObjectMapper sortingMapper;
private final KafkaSupervisorSpec spec;
private final ServiceEmitter emitter;
private final MonitorSchedulerConfig monitorSchedulerConfig;
private final String dataSource;
private final KafkaSupervisorIOConfig ioConfig;
private final KafkaSupervisorTuningConfig tuningConfig;
@ -200,9 +206,14 @@ public class KafkaSupervisor implements Supervisor
private volatile DateTime firstRunTime;
private volatile KafkaConsumer consumer;
private volatile KafkaConsumer lagComputingConsumer;
private volatile boolean started = false;
private volatile boolean stopped = false;
private final ScheduledExecutorService metricEmittingExec;
// used while reporting lag
private final Map<Integer, Long> lastCurrentOffsets = new HashMap<>();
public KafkaSupervisor(
final TaskStorage taskStorage,
final TaskMaster taskMaster,
@ -217,6 +228,8 @@ public class KafkaSupervisor implements Supervisor
this.indexerMetadataStorageCoordinator = indexerMetadataStorageCoordinator;
this.sortingMapper = mapper.copy().configure(MapperFeature.SORT_PROPERTIES_ALPHABETICALLY, true);
this.spec = spec;
this.emitter = spec.getEmitter();
this.monitorSchedulerConfig = spec.getMonitorSchedulerConfig();
this.dataSource = spec.getDataSchema().getDataSource();
this.ioConfig = spec.getIoConfig();
@ -225,6 +238,7 @@ public class KafkaSupervisor implements Supervisor
this.supervisorId = String.format("KafkaSupervisor-%s", dataSource);
this.exec = Execs.singleThreaded(supervisorId);
this.scheduledExec = Execs.scheduledSingleThreaded(supervisorId + "-Scheduler-%d");
this.metricEmittingExec = Execs.scheduledSingleThreaded(supervisorId + "-Emitter-%d");
int workerThreads = (this.tuningConfig.getWorkerThreads() != null
? this.tuningConfig.getWorkerThreads()
@ -302,6 +316,7 @@ public class KafkaSupervisor implements Supervisor
try {
consumer = getKafkaConsumer();
lagComputingConsumer = getKafkaConsumer();
exec.submit(
new Runnable()
@ -337,6 +352,13 @@ public class KafkaSupervisor implements Supervisor
TimeUnit.MILLISECONDS
);
metricEmittingExec.scheduleAtFixedRate(
computeAndEmitLag(taskClient),
ioConfig.getStartDelay().getMillis() + 10000, // wait for tasks to start up
Math.max(monitorSchedulerConfig.getEmitterPeriod().getMillis(), 60 * 1000),
TimeUnit.MILLISECONDS
);
started = true;
log.info(
"Started KafkaSupervisor[%s], first run in [%s], with spec: [%s]",
@ -349,6 +371,9 @@ public class KafkaSupervisor implements Supervisor
if (consumer != null) {
consumer.close();
}
if (lagComputingConsumer != null) {
lagComputingConsumer.close();
}
log.makeAlert(e, "Exception starting KafkaSupervisor[%s]", dataSource)
.emit();
throw Throwables.propagate(e);
@ -366,6 +391,7 @@ public class KafkaSupervisor implements Supervisor
try {
scheduledExec.shutdownNow(); // stop recurring executions
metricEmittingExec.shutdownNow();
Optional<TaskRunner> taskRunner = taskMaster.getTaskRunner();
if (taskRunner.isPresent()) {
@ -499,6 +525,7 @@ public class KafkaSupervisor implements Supervisor
public void handle() throws InterruptedException, ExecutionException, TimeoutException
{
consumer.close();
lagComputingConsumer.close();
synchronized (stopLock) {
stopped = true;
@ -1677,4 +1704,100 @@ public class KafkaSupervisor implements Supervisor
}
};
}
private Runnable computeAndEmitLag(final KafkaIndexTaskClient taskClient)
{
return new Runnable()
{
@Override
public void run()
{
try {
final Map<String, List<PartitionInfo>> topics = lagComputingConsumer.listTopics();
final List<PartitionInfo> partitionInfoList = topics.get(ioConfig.getTopic());
lagComputingConsumer.assign(
Lists.transform(partitionInfoList, new Function<PartitionInfo, TopicPartition>()
{
@Override
public TopicPartition apply(PartitionInfo input)
{
return new TopicPartition(ioConfig.getTopic(), input.partition());
}
})
);
final Map<Integer, Long> offsetsResponse = new ConcurrentHashMap<>();
final List<ListenableFuture<Void>> futures = Lists.newArrayList();
for (TaskGroup taskGroup : taskGroups.values()) {
for (String taskId : taskGroup.taskIds()) {
futures.add(Futures.transform(
taskClient.getCurrentOffsetsAsync(taskId, false),
new Function<Map<Integer, Long>, Void>()
{
@Override
public Void apply(Map<Integer, Long> taskResponse)
{
if (taskResponse != null) {
for (final Map.Entry<Integer, Long> partitionOffsets : taskResponse.entrySet()) {
offsetsResponse.compute(partitionOffsets.getKey(), new BiFunction<Integer, Long, Long>()
{
@Override
public Long apply(Integer key, Long existingOffsetInMap)
{
// If existing value is null use the offset returned by task
// otherwise use the max (makes sure max offset is taken from replicas)
return existingOffsetInMap == null
? partitionOffsets.getValue()
: Math.max(partitionOffsets.getValue(), existingOffsetInMap);
}
});
}
}
return null;
}
}
)
);
}
}
// not using futureTimeoutInSeconds as its min value is 120 seconds
// and minimum emission period for this metric is 60 seconds
Futures.successfulAsList(futures).get(30, TimeUnit.SECONDS);
// for each partition, seek to end to get the highest offset
// check the offsetsResponse map for the latest consumed offset
// if partition info not present in offsetsResponse then use lastCurrentOffsets map
// if not present there as well, fail the compute
long lag = 0;
for (PartitionInfo partitionInfo : partitionInfoList) {
long diff;
final TopicPartition topicPartition = new TopicPartition(ioConfig.getTopic(), partitionInfo.partition());
lagComputingConsumer.seekToEnd(ImmutableList.of(topicPartition));
if (offsetsResponse.get(topicPartition.partition()) != null) {
diff = lagComputingConsumer.position(topicPartition) - offsetsResponse.get(topicPartition.partition());
lastCurrentOffsets.put(topicPartition.partition(), offsetsResponse.get(topicPartition.partition()));
} else if (lastCurrentOffsets.get(topicPartition.partition()) != null) {
diff = lagComputingConsumer.position(topicPartition) - lastCurrentOffsets.get(topicPartition.partition());
} else {
throw new ISE("Could not find latest consumed offset for partition [%d]", topicPartition.partition());
}
lag += diff;
log.debug(
"Topic - [%s] Partition - [%d] : Partition lag [%,d], Total lag so far [%,d]",
topicPartition.topic(),
topicPartition.partition(),
diff,
lag
);
}
emitter.emit(
ServiceMetricEvent.builder().setDimension("dataSource", dataSource).build("ingest/kafka/lag", lag)
);
}
catch (Exception e) {
log.warn(e, "Unable to compute Kafka lag");
}
}
};
}
}

View File

@ -25,6 +25,8 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.metrics.MonitorSchedulerConfig;
import io.druid.guice.annotations.Json;
import io.druid.indexing.kafka.KafkaIndexTaskClientFactory;
import io.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
@ -49,6 +51,8 @@ public class KafkaSupervisorSpec implements SupervisorSpec
private final IndexerMetadataStorageCoordinator indexerMetadataStorageCoordinator;
private final KafkaIndexTaskClientFactory kafkaIndexTaskClientFactory;
private final ObjectMapper mapper;
private final ServiceEmitter emitter;
private final MonitorSchedulerConfig monitorSchedulerConfig;
@JsonCreator
public KafkaSupervisorSpec(
@ -60,7 +64,9 @@ public class KafkaSupervisorSpec implements SupervisorSpec
@JacksonInject TaskMaster taskMaster,
@JacksonInject IndexerMetadataStorageCoordinator indexerMetadataStorageCoordinator,
@JacksonInject KafkaIndexTaskClientFactory kafkaIndexTaskClientFactory,
@JacksonInject @Json ObjectMapper mapper
@JacksonInject @Json ObjectMapper mapper,
@JacksonInject ServiceEmitter emitter,
@JacksonInject MonitorSchedulerConfig monitorSchedulerConfig
)
{
this.dataSchema = Preconditions.checkNotNull(dataSchema, "dataSchema");
@ -91,6 +97,8 @@ public class KafkaSupervisorSpec implements SupervisorSpec
this.indexerMetadataStorageCoordinator = indexerMetadataStorageCoordinator;
this.kafkaIndexTaskClientFactory = kafkaIndexTaskClientFactory;
this.mapper = mapper;
this.emitter = emitter;
this.monitorSchedulerConfig = monitorSchedulerConfig;
}
@JsonProperty
@ -117,12 +125,22 @@ public class KafkaSupervisorSpec implements SupervisorSpec
return context;
}
public ServiceEmitter getEmitter()
{
return emitter;
}
@Override
public String getId()
{
return dataSchema.getDataSource();
}
public MonitorSchedulerConfig getMonitorSchedulerConfig()
{
return monitorSchedulerConfig;
}
@Override
public Supervisor createSupervisor()
{

View File

@ -65,6 +65,8 @@ import io.druid.segment.indexing.DataSchema;
import io.druid.segment.indexing.RealtimeIOConfig;
import io.druid.segment.indexing.granularity.UniformGranularitySpec;
import io.druid.segment.realtime.FireDepartment;
import io.druid.server.metrics.DruidMonitorSchedulerConfig;
import io.druid.server.metrics.NoopServiceEmitter;
import org.apache.curator.test.TestingCluster;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
@ -1718,7 +1720,9 @@ public class KafkaSupervisorTest extends EasyMockSupport
taskMaster,
indexerMetadataStorageCoordinator,
taskClientFactory,
objectMapper
objectMapper,
new NoopServiceEmitter(),
new DruidMonitorSchedulerConfig()
)
);
}