Report Kafka lag information in supervisor status report (#4314)

* refactor lag reporting and report lag at status endpoint

* refactor offset reporting logic to fetch offsets periodically vs. at request time

* remove JavaCompatUtils

* code review changes

* code review changes
This commit is contained in:
David Lim 2017-06-05 14:26:25 -06:00 committed by Jonathan Wei
parent a2584d214a
commit 13ecf90923
10 changed files with 305 additions and 233 deletions

View File

@ -129,6 +129,7 @@ The tuningConfig is optional and default parameters will be used if no tuningCon
|`chatRetries`|Integer|The number of times HTTP requests to indexing tasks will be retried before considering tasks unresponsive.|no (default == 8)|
|`httpTimeout`|ISO8601 Period|How long to wait for a HTTP response from an indexing task.|no (default == PT10S)|
|`shutdownTimeout`|ISO8601 Period|How long to wait for the supervisor to attempt a graceful shutdown of tasks before exiting.|no (default == PT80S)|
|`offsetFetchPeriod`|ISO8601 Period|How often the supervisor queries Kafka and the indexing tasks to fetch current offsets and calculate lag.|no (default == PT30S, min == PT5S)|
#### IndexSpec
@ -211,7 +212,10 @@ Returns the current spec for the supervisor with the provided ID.
```
GET /druid/indexer/v1/supervisor/<supervisorId>/status
```
Returns a snapshot report of the current state of the tasks managed by the given supervisor.
Returns a snapshot report of the current state of the tasks managed by the given supervisor. This includes the latest
offsets as reported by Kafka, the consumer lag per partition, as well as the aggregate lag of all partitions. The
consumer lag per partition may be reported as negative values if the supervisor has not received a recent latest offset
response from Kafka. The aggregate lag value will always be >= 0.
#### Get All Supervisor History
```

View File

@ -67,7 +67,6 @@ import io.druid.indexing.overlord.supervisor.Supervisor;
import io.druid.indexing.overlord.supervisor.SupervisorReport;
import io.druid.java.util.common.IAE;
import io.druid.java.util.common.ISE;
import io.druid.java.util.common.collect.JavaCompatUtils;
import io.druid.metadata.EntryExistsException;
import io.druid.server.metrics.DruidMonitorSchedulerConfig;
import org.apache.commons.codec.digest.DigestUtils;
@ -96,7 +95,8 @@ import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
import java.util.stream.Stream;
/**
* Supervisor responsible for managing the KafkaIndexTasks for a single dataSource. At a high level, the class accepts a
@ -113,6 +113,9 @@ public class KafkaSupervisor implements Supervisor
private static final long MAX_RUN_FREQUENCY_MILLIS = 1000; // prevent us from running too often in response to events
private static final long NOT_SET = -1;
private static final long MINIMUM_FUTURE_TIMEOUT_IN_SECONDS = 120;
private static final long MINIMUM_GET_OFFSET_PERIOD_MILLIS = 5000;
private static final long INITIAL_GET_OFFSET_DELAY_MILLIS = 15000;
private static final long INITIAL_EMIT_LAG_METRIC_DELAY_MILLIS = 25000;
// Internal data structures
// --------------------------------------------------------
@ -146,14 +149,15 @@ public class KafkaSupervisor implements Supervisor
Set<String> taskIds()
{
return JavaCompatUtils.keySet(tasks);
return tasks.keySet();
}
}
private static class TaskData
{
TaskStatus status;
DateTime startTime;
volatile TaskStatus status;
volatile DateTime startTime;
volatile Map<Integer, Long> currentOffsets = new HashMap<>();
}
// Map<{group ID}, {actively reading task group}>; see documentation for TaskGroup class
@ -196,23 +200,22 @@ public class KafkaSupervisor implements Supervisor
private final ExecutorService exec;
private final ScheduledExecutorService scheduledExec;
private final ScheduledExecutorService reportingExec;
private final ListeningExecutorService workerExec;
private final BlockingQueue<Notice> notices = new LinkedBlockingDeque<>();
private final Object stopLock = new Object();
private final Object stateChangeLock = new Object();
private final Object consumerLock = new Object();
private boolean listenerRegistered = false;
private long lastRunTime;
private volatile DateTime firstRunTime;
private volatile KafkaConsumer consumer;
private volatile KafkaConsumer lagComputingConsumer;
private volatile boolean started = false;
private volatile boolean stopped = false;
private final ScheduledExecutorService metricEmittingExec;
// used while reporting lag
private final Map<Integer, Long> lastCurrentOffsets = new HashMap<>();
private volatile Map<Integer, Long> latestOffsetsFromKafka;
private volatile DateTime offsetsLastUpdated;
public KafkaSupervisor(
final TaskStorage taskStorage,
@ -238,7 +241,7 @@ public class KafkaSupervisor implements Supervisor
this.supervisorId = String.format("KafkaSupervisor-%s", dataSource);
this.exec = Execs.singleThreaded(supervisorId);
this.scheduledExec = Execs.scheduledSingleThreaded(supervisorId + "-Scheduler-%d");
this.metricEmittingExec = Execs.scheduledSingleThreaded(supervisorId + "-Emitter-%d");
this.reportingExec = Execs.scheduledSingleThreaded(supervisorId + "-Reporting-%d");
int workerThreads = (this.tuningConfig.getWorkerThreads() != null
? this.tuningConfig.getWorkerThreads()
@ -316,7 +319,6 @@ public class KafkaSupervisor implements Supervisor
try {
consumer = getKafkaConsumer();
lagComputingConsumer = getKafkaConsumer();
exec.submit(
new Runnable()
@ -352,10 +354,19 @@ public class KafkaSupervisor implements Supervisor
TimeUnit.MILLISECONDS
);
metricEmittingExec.scheduleAtFixedRate(
computeAndEmitLag(taskClient),
ioConfig.getStartDelay().getMillis() + 10000, // wait for tasks to start up
Math.max(monitorSchedulerConfig.getEmitterPeriod().getMillis(), 60 * 1000),
reportingExec.scheduleAtFixedRate(
updateCurrentAndLatestOffsets(),
ioConfig.getStartDelay().getMillis() + INITIAL_GET_OFFSET_DELAY_MILLIS, // wait for tasks to start up
Math.max(
tuningConfig.getOffsetFetchPeriod().getMillis(), MINIMUM_GET_OFFSET_PERIOD_MILLIS
),
TimeUnit.MILLISECONDS
);
reportingExec.scheduleAtFixedRate(
emitLag(),
ioConfig.getStartDelay().getMillis() + INITIAL_EMIT_LAG_METRIC_DELAY_MILLIS, // wait for tasks to start up
monitorSchedulerConfig.getEmitterPeriod().getMillis(),
TimeUnit.MILLISECONDS
);
@ -371,9 +382,6 @@ public class KafkaSupervisor implements Supervisor
if (consumer != null) {
consumer.close();
}
if (lagComputingConsumer != null) {
lagComputingConsumer.close();
}
log.makeAlert(e, "Exception starting KafkaSupervisor[%s]", dataSource)
.emit();
throw Throwables.propagate(e);
@ -391,7 +399,7 @@ public class KafkaSupervisor implements Supervisor
try {
scheduledExec.shutdownNow(); // stop recurring executions
metricEmittingExec.shutdownNow();
reportingExec.shutdownNow();
Optional<TaskRunner> taskRunner = taskMaster.getTaskRunner();
if (taskRunner.isPresent()) {
@ -525,7 +533,6 @@ public class KafkaSupervisor implements Supervisor
public void handle() throws InterruptedException, ExecutionException, TimeoutException
{
consumer.close();
lagComputingConsumer.close();
synchronized (stopLock) {
stopped = true;
@ -558,7 +565,7 @@ public class KafkaSupervisor implements Supervisor
// Reset everything
boolean result = indexerMetadataStorageCoordinator.deleteDataSourceMetadata(dataSource);
log.info("Reset dataSource[%s] - dataSource metadata entry deleted? [%s]", dataSource, result);
killTaskGroupForPartitions(JavaCompatUtils.keySet(taskGroups));
killTaskGroupForPartitions(taskGroups.keySet());
} else if (!(dataSourceMetadata instanceof KafkaDataSourceMetadata)) {
throw new IAE("Expected KafkaDataSourceMetadata but found instance of [%s]", dataSourceMetadata.getClass());
} else {
@ -614,8 +621,7 @@ public class KafkaSupervisor implements Supervisor
}
}
if (metadataUpdateSuccess) {
killTaskGroupForPartitions(JavaCompatUtils.keySet(resetKafkaMetadata.getKafkaPartitions()
.getPartitionOffsetMap()));
killTaskGroupForPartitions(resetKafkaMetadata.getKafkaPartitions().getPartitionOffsetMap().keySet());
} else {
throw new ISE("Unable to reset metadata");
}
@ -635,7 +641,7 @@ public class KafkaSupervisor implements Supervisor
TaskGroup taskGroup = taskGroups.get(getTaskGroupIdForPartition(partition));
if (taskGroup != null) {
// kill all tasks in this task group
for (String taskId : JavaCompatUtils.keySet(taskGroup.tasks)) {
for (String taskId : taskGroup.tasks.keySet()) {
log.info("Reset dataSource[%s] - killing task [%s]", dataSource, taskId);
killTask(taskId);
}
@ -747,7 +753,9 @@ public class KafkaSupervisor implements Supervisor
{
Map<String, List<PartitionInfo>> topics;
try {
topics = consumer.listTopics(); // updates the consumer's list of partitions from the brokers
synchronized (consumerLock) {
topics = consumer.listTopics(); // updates the consumer's list of partitions from the brokers
}
}
catch (Exception e) { // calls to the consumer throw NPEs when the broker doesn't respond
log.warn(
@ -1342,13 +1350,9 @@ public class KafkaSupervisor implements Supervisor
void createNewTasks()
{
// check that there is a current task group for each group of partitions in [partitionGroups]
for (Integer groupId : JavaCompatUtils.keySet(partitionGroups)) {
for (Integer groupId : partitionGroups.keySet()) {
if (!taskGroups.containsKey(groupId)) {
log.info(
"Creating new task group [%d] for partitions %s",
groupId,
JavaCompatUtils.keySet(partitionGroups.get(groupId))
);
log.info("Creating new task group [%d] for partitions %s", groupId, partitionGroups.get(groupId).keySet());
Optional<DateTime> minimumMessageTime = (ioConfig.getLateMessageRejectionPeriod().isPresent() ? Optional.of(
DateTime.now().minus(ioConfig.getLateMessageRejectionPeriod().get())
@ -1507,18 +1511,20 @@ public class KafkaSupervisor implements Supervisor
private long getOffsetFromKafkaForPartition(int partition, boolean useEarliestOffset)
{
TopicPartition topicPartition = new TopicPartition(ioConfig.getTopic(), partition);
if (!consumer.assignment().contains(topicPartition)) {
consumer.assign(Collections.singletonList(topicPartition));
}
synchronized (consumerLock) {
TopicPartition topicPartition = new TopicPartition(ioConfig.getTopic(), partition);
if (!consumer.assignment().contains(topicPartition)) {
consumer.assign(Collections.singletonList(topicPartition));
}
if (useEarliestOffset) {
consumer.seekToBeginning(Collections.singletonList(topicPartition));
} else {
consumer.seekToEnd(Collections.singletonList(topicPartition));
}
if (useEarliestOffset) {
consumer.seekToBeginning(Collections.singletonList(topicPartition));
} else {
consumer.seekToEnd(Collections.singletonList(topicPartition));
}
return consumer.position(topicPartition);
return consumer.position(topicPartition);
}
}
/**
@ -1602,28 +1608,30 @@ public class KafkaSupervisor implements Supervisor
private KafkaSupervisorReport generateReport(boolean includeOffsets)
{
int numPartitions = 0;
for (Map<Integer, Long> partitionGroup : partitionGroups.values()) {
numPartitions += partitionGroup.size();
}
int numPartitions = partitionGroups.values().stream().mapToInt(Map::size).sum();
Map<Integer, Long> partitionLag = getLagPerPartition(getHighestCurrentOffsets());
KafkaSupervisorReport report = new KafkaSupervisorReport(
dataSource,
DateTime.now(),
ioConfig.getTopic(),
numPartitions,
ioConfig.getReplicas(),
ioConfig.getTaskDuration().getMillis() / 1000
ioConfig.getTaskDuration().getMillis() / 1000,
includeOffsets ? latestOffsetsFromKafka : null,
includeOffsets ? partitionLag : null,
includeOffsets ? partitionLag.values().stream().mapToLong(x -> Math.max(x, 0)).sum() : null,
includeOffsets ? offsetsLastUpdated : null
);
List<TaskReportData> taskReports = Lists.newArrayList();
List<ListenableFuture<Map<Integer, Long>>> futures = Lists.newArrayList();
try {
for (TaskGroup taskGroup : taskGroups.values()) {
for (Map.Entry<String, TaskData> entry : taskGroup.tasks.entrySet()) {
String taskId = entry.getKey();
DateTime startTime = entry.getValue().startTime;
Map<Integer, Long> currentOffsets = entry.getValue().currentOffsets;
Long remainingSeconds = null;
if (startTime != null) {
remainingSeconds = Math.max(
@ -1634,17 +1642,14 @@ public class KafkaSupervisor implements Supervisor
taskReports.add(
new TaskReportData(
taskId,
(includeOffsets ? taskGroup.partitionOffsets : null),
null,
includeOffsets ? taskGroup.partitionOffsets : null,
includeOffsets ? currentOffsets : null,
startTime,
remainingSeconds,
TaskReportData.TaskType.ACTIVE
TaskReportData.TaskType.ACTIVE,
includeOffsets ? getLagPerPartition(currentOffsets) : null
)
);
if (includeOffsets) {
futures.add(taskClient.getCurrentOffsetsAsync(taskId, false));
}
}
}
@ -1653,6 +1658,7 @@ public class KafkaSupervisor implements Supervisor
for (Map.Entry<String, TaskData> entry : taskGroup.tasks.entrySet()) {
String taskId = entry.getKey();
DateTime startTime = entry.getValue().startTime;
Map<Integer, Long> currentOffsets = entry.getValue().currentOffsets;
Long remainingSeconds = null;
if (taskGroup.completionTimeout != null) {
remainingSeconds = Math.max(0, taskGroup.completionTimeout.getMillis() - DateTime.now().getMillis())
@ -1662,30 +1668,19 @@ public class KafkaSupervisor implements Supervisor
taskReports.add(
new TaskReportData(
taskId,
(includeOffsets ? taskGroup.partitionOffsets : null),
null,
includeOffsets ? taskGroup.partitionOffsets : null,
includeOffsets ? currentOffsets : null,
startTime,
remainingSeconds,
TaskReportData.TaskType.PUBLISHING
TaskReportData.TaskType.PUBLISHING,
null
)
);
if (includeOffsets) {
futures.add(taskClient.getCurrentOffsetsAsync(taskId, false));
}
}
}
}
List<Map<Integer, Long>> results = Futures.successfulAsList(futures)
.get(futureTimeoutInSeconds, TimeUnit.SECONDS);
for (int i = 0; i < taskReports.size(); i++) {
TaskReportData reportData = taskReports.get(i);
if (includeOffsets) {
reportData.setCurrentOffsets(results.get(i));
}
report.addTask(reportData);
}
taskReports.stream().forEach(report::addTask);
}
catch (Exception e) {
log.warn(e, "Failed to generate status report");
@ -1696,111 +1691,128 @@ public class KafkaSupervisor implements Supervisor
private Runnable buildRunTask()
{
return new Runnable()
{
@Override
public void run()
{
notices.add(new RunNotice());
return () -> notices.add(new RunNotice());
}
private void updateLatestOffsetsFromKafka()
{
synchronized (consumerLock) {
final Map<String, List<PartitionInfo>> topics = consumer.listTopics();
if (topics == null || !topics.containsKey(ioConfig.getTopic())) {
throw new ISE("Could not retrieve partitions for topic [%s]", ioConfig.getTopic());
}
final Set<TopicPartition> topicPartitions = topics.get(ioConfig.getTopic())
.stream()
.map(x -> new TopicPartition(x.topic(), x.partition()))
.collect(Collectors.toSet());
consumer.assign(topicPartitions);
consumer.seekToEnd(topicPartitions);
latestOffsetsFromKafka = topicPartitions
.stream()
.collect(Collectors.toMap(TopicPartition::partition, consumer::position));
}
}
private Map<Integer, Long> getHighestCurrentOffsets()
{
return taskGroups
.values()
.stream()
.flatMap(taskGroup -> taskGroup.tasks.entrySet().stream())
.flatMap(taskData -> taskData.getValue().currentOffsets.entrySet().stream())
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue, Long::max));
}
private Map<Integer, Long> getLagPerPartition(Map<Integer, Long> currentOffsets)
{
return currentOffsets
.entrySet()
.stream()
.collect(
Collectors.toMap(
Map.Entry::getKey,
e -> latestOffsetsFromKafka != null
&& latestOffsetsFromKafka.get(e.getKey()) != null
&& e.getValue() != null
? latestOffsetsFromKafka.get(e.getKey()) - e.getValue()
: null
)
);
}
private Runnable emitLag()
{
return () -> {
try {
Map<Integer, Long> highestCurrentOffsets = getHighestCurrentOffsets();
if (latestOffsetsFromKafka == null) {
throw new ISE("Latest offsets from Kafka have not been fetched");
}
if (!latestOffsetsFromKafka.keySet().equals(highestCurrentOffsets.keySet())) {
log.warn(
"Lag metric: Kafka partitions %s do not match task partitions %s",
latestOffsetsFromKafka.keySet(),
highestCurrentOffsets.keySet()
);
}
long lag = getLagPerPartition(highestCurrentOffsets)
.values()
.stream()
.mapToLong(x -> Math.max(x, 0))
.sum();
emitter.emit(
ServiceMetricEvent.builder().setDimension("dataSource", dataSource).build("ingest/kafka/lag", lag)
);
}
catch (Exception e) {
log.warn(e, "Unable to compute Kafka lag");
}
};
}
private Runnable computeAndEmitLag(final KafkaIndexTaskClient taskClient)
private void updateCurrentOffsets() throws InterruptedException, ExecutionException, TimeoutException
{
return new Runnable()
{
@Override
public void run()
{
try {
final Map<String, List<PartitionInfo>> topics = lagComputingConsumer.listTopics();
final List<PartitionInfo> partitionInfoList = topics.get(ioConfig.getTopic());
lagComputingConsumer.assign(
Lists.transform(partitionInfoList, new Function<PartitionInfo, TopicPartition>()
{
@Override
public TopicPartition apply(PartitionInfo input)
{
return new TopicPartition(ioConfig.getTopic(), input.partition());
}
})
);
final Map<Integer, Long> offsetsResponse = new ConcurrentHashMap<>();
final List<ListenableFuture<Void>> futures = Lists.newArrayList();
for (TaskGroup taskGroup : taskGroups.values()) {
for (String taskId : taskGroup.taskIds()) {
futures.add(Futures.transform(
taskClient.getCurrentOffsetsAsync(taskId, false),
new Function<Map<Integer, Long>, Void>()
{
@Override
public Void apply(Map<Integer, Long> taskResponse)
{
if (taskResponse != null) {
for (final Map.Entry<Integer, Long> partitionOffsets : taskResponse.entrySet()) {
offsetsResponse.compute(partitionOffsets.getKey(), new BiFunction<Integer, Long, Long>()
{
@Override
public Long apply(Integer key, Long existingOffsetInMap)
{
// If existing value is null use the offset returned by task
// otherwise use the max (makes sure max offset is taken from replicas)
return existingOffsetInMap == null
? partitionOffsets.getValue()
: Math.max(partitionOffsets.getValue(), existingOffsetInMap);
}
});
}
}
return null;
}
}
)
);
}
}
// not using futureTimeoutInSeconds as its min value is 120 seconds
// and minimum emission period for this metric is 60 seconds
Futures.successfulAsList(futures).get(30, TimeUnit.SECONDS);
final List<ListenableFuture<Void>> futures = Stream.concat(
taskGroups.values().stream().flatMap(taskGroup -> taskGroup.tasks.entrySet().stream()),
pendingCompletionTaskGroups.values()
.stream()
.flatMap(List::stream)
.flatMap(taskGroup -> taskGroup.tasks.entrySet().stream())
).map(
task -> Futures.transform(
taskClient.getCurrentOffsetsAsync(task.getKey(), false),
(Function<Map<Integer, Long>, Void>) (currentOffsets) -> {
// for each partition, seek to end to get the highest offset
// check the offsetsResponse map for the latest consumed offset
// if partition info not present in offsetsResponse then use lastCurrentOffsets map
// if not present there as well, fail the compute
if (currentOffsets != null && !currentOffsets.isEmpty()) {
task.getValue().currentOffsets = currentOffsets;
}
long lag = 0;
for (PartitionInfo partitionInfo : partitionInfoList) {
long diff;
final TopicPartition topicPartition = new TopicPartition(ioConfig.getTopic(), partitionInfo.partition());
lagComputingConsumer.seekToEnd(ImmutableList.of(topicPartition));
if (offsetsResponse.get(topicPartition.partition()) != null) {
diff = lagComputingConsumer.position(topicPartition) - offsetsResponse.get(topicPartition.partition());
lastCurrentOffsets.put(topicPartition.partition(), offsetsResponse.get(topicPartition.partition()));
} else if (lastCurrentOffsets.get(topicPartition.partition()) != null) {
diff = lagComputingConsumer.position(topicPartition) - lastCurrentOffsets.get(topicPartition.partition());
} else {
throw new ISE("Could not find latest consumed offset for partition [%d]", topicPartition.partition());
return null;
}
lag += diff;
log.debug(
"Topic - [%s] Partition - [%d] : Partition lag [%,d], Total lag so far [%,d]",
topicPartition.topic(),
topicPartition.partition(),
diff,
lag
);
}
emitter.emit(
ServiceMetricEvent.builder().setDimension("dataSource", dataSource).build("ingest/kafka/lag", lag)
);
}
catch (InterruptedException e) {
// do nothing, probably we are shutting down
}
catch (Exception e) {
log.warn(e, "Unable to compute Kafka lag");
}
)
).collect(Collectors.toList());
Futures.successfulAsList(futures).get(futureTimeoutInSeconds, TimeUnit.SECONDS);
}
@VisibleForTesting
Runnable updateCurrentAndLatestOffsets()
{
return () -> {
try {
updateCurrentOffsets();
updateLatestOffsetsFromKafka();
offsetsLastUpdated = DateTime.now();
}
catch (Exception e) {
log.warn(e, "Exception while getting current/latest offsets");
}
};
}

View File

@ -19,15 +19,16 @@
package io.druid.indexing.kafka.supervisor;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.Lists;
import io.druid.indexing.overlord.supervisor.SupervisorReport;
import io.druid.java.util.common.IAE;
import org.joda.time.DateTime;
import javax.annotation.Nullable;
import java.util.List;
import java.util.Map;
public class KafkaSupervisorReport extends SupervisorReport
{
@ -40,13 +41,21 @@ public class KafkaSupervisorReport extends SupervisorReport
private final Long durationSeconds;
private final List<TaskReportData> activeTasks;
private final List<TaskReportData> publishingTasks;
private final Map<Integer, Long> latestOffsets;
private final Map<Integer, Long> minimumLag;
private final Long aggregateLag;
private final DateTime offsetsLastUpdated;
public KafkaSupervisorReportPayload(
String dataSource,
String topic,
Integer partitions,
Integer replicas,
Long durationSeconds
Long durationSeconds,
@Nullable Map<Integer, Long> latestOffsets,
@Nullable Map<Integer, Long> minimumLag,
@Nullable Long aggregateLag,
@Nullable DateTime offsetsLastUpdated
)
{
this.dataSource = dataSource;
@ -56,6 +65,10 @@ public class KafkaSupervisorReport extends SupervisorReport
this.durationSeconds = durationSeconds;
this.activeTasks = Lists.newArrayList();
this.publishingTasks = Lists.newArrayList();
this.latestOffsets = latestOffsets;
this.minimumLag = minimumLag;
this.aggregateLag = aggregateLag;
this.offsetsLastUpdated = offsetsLastUpdated;
}
@JsonProperty
@ -100,6 +113,33 @@ public class KafkaSupervisorReport extends SupervisorReport
return publishingTasks;
}
@JsonProperty
@JsonInclude(JsonInclude.Include.NON_NULL)
public Map<Integer, Long> getLatestOffsets()
{
return latestOffsets;
}
@JsonProperty
@JsonInclude(JsonInclude.Include.NON_NULL)
public Map<Integer, Long> getMinimumLag()
{
return minimumLag;
}
@JsonProperty
@JsonInclude(JsonInclude.Include.NON_NULL)
public Long getAggregateLag()
{
return aggregateLag;
}
@JsonProperty
public DateTime getOffsetsLastUpdated()
{
return offsetsLastUpdated;
}
@Override
public String toString()
{
@ -111,6 +151,10 @@ public class KafkaSupervisorReport extends SupervisorReport
", durationSeconds=" + durationSeconds +
", active=" + activeTasks +
", publishing=" + publishingTasks +
(latestOffsets != null ? ", latestOffsets=" + latestOffsets : "") +
(minimumLag != null ? ", minimumLag=" + minimumLag : "") +
(aggregateLag != null ? ", aggregateLag=" + aggregateLag : "") +
(offsetsLastUpdated != null ? ", offsetsLastUpdated=" + offsetsLastUpdated : "") +
'}';
}
}
@ -123,11 +167,25 @@ public class KafkaSupervisorReport extends SupervisorReport
String topic,
Integer partitions,
Integer replicas,
Long durationSeconds
Long durationSeconds,
@Nullable Map<Integer, Long> latestOffsets,
@Nullable Map<Integer, Long> minimumLag,
@Nullable Long aggregateLag,
@Nullable DateTime offsetsLastUpdated
)
{
super(dataSource, generationTime);
this.payload = new KafkaSupervisorReportPayload(dataSource, topic, partitions, replicas, durationSeconds);
this.payload = new KafkaSupervisorReportPayload(
dataSource,
topic,
partitions,
replicas,
durationSeconds,
latestOffsets,
minimumLag,
aggregateLag,
offsetsLastUpdated
);
}
@Override

View File

@ -87,6 +87,7 @@ public class KafkaSupervisorSpec implements SupervisorSpec
null,
null,
null,
null,
null
);
this.ioConfig = Preconditions.checkNotNull(ioConfig, "ioConfig");

View File

@ -34,6 +34,7 @@ public class KafkaSupervisorTuningConfig extends KafkaTuningConfig
private final Long chatRetries;
private final Duration httpTimeout;
private final Duration shutdownTimeout;
private final Duration offsetFetchPeriod;
public KafkaSupervisorTuningConfig(
@JsonProperty("maxRowsInMemory") Integer maxRowsInMemory,
@ -50,7 +51,8 @@ public class KafkaSupervisorTuningConfig extends KafkaTuningConfig
@JsonProperty("chatThreads") Integer chatThreads,
@JsonProperty("chatRetries") Long chatRetries,
@JsonProperty("httpTimeout") Period httpTimeout,
@JsonProperty("shutdownTimeout") Period shutdownTimeout
@JsonProperty("shutdownTimeout") Period shutdownTimeout,
@JsonProperty("offsetFetchPeriod") Period offsetFetchPeriod
)
{
super(
@ -73,6 +75,7 @@ public class KafkaSupervisorTuningConfig extends KafkaTuningConfig
this.chatRetries = (chatRetries != null ? chatRetries : 8);
this.httpTimeout = defaultDuration(httpTimeout, "PT10S");
this.shutdownTimeout = defaultDuration(shutdownTimeout, "PT80S");
this.offsetFetchPeriod = defaultDuration(offsetFetchPeriod, "PT30S");
}
@JsonProperty
@ -105,6 +108,12 @@ public class KafkaSupervisorTuningConfig extends KafkaTuningConfig
return shutdownTimeout;
}
@JsonProperty
public Duration getOffsetFetchPeriod()
{
return offsetFetchPeriod;
}
@Override
public String toString()
{
@ -124,6 +133,7 @@ public class KafkaSupervisorTuningConfig extends KafkaTuningConfig
", chatRetries=" + chatRetries +
", httpTimeout=" + httpTimeout +
", shutdownTimeout=" + shutdownTimeout +
", offsetFetchPeriod=" + offsetFetchPeriod +
'}';
}

View File

@ -19,9 +19,11 @@
package io.druid.indexing.kafka.supervisor;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.joda.time.DateTime;
import javax.annotation.Nullable;
import java.util.Map;
public class TaskReportData
@ -36,15 +38,17 @@ public class TaskReportData
private final DateTime startTime;
private final Long remainingSeconds;
private final TaskType type;
private Map<Integer, Long> currentOffsets;
private final Map<Integer, Long> currentOffsets;
private final Map<Integer, Long> lag;
public TaskReportData(
String id,
Map<Integer, Long> startingOffsets,
Map<Integer, Long> currentOffsets,
@Nullable Map<Integer, Long> startingOffsets,
@Nullable Map<Integer, Long> currentOffsets,
DateTime startTime,
Long remainingSeconds,
TaskType type
TaskType type,
@Nullable Map<Integer, Long> lag
)
{
this.id = id;
@ -53,6 +57,7 @@ public class TaskReportData
this.startTime = startTime;
this.remainingSeconds = remainingSeconds;
this.type = type;
this.lag = lag;
}
@JsonProperty
@ -62,22 +67,19 @@ public class TaskReportData
}
@JsonProperty
@JsonInclude(JsonInclude.Include.NON_NULL)
public Map<Integer, Long> getStartingOffsets()
{
return startingOffsets;
}
@JsonProperty
@JsonInclude(JsonInclude.Include.NON_NULL)
public Map<Integer, Long> getCurrentOffsets()
{
return currentOffsets;
}
public void setCurrentOffsets(Map<Integer, Long> currentOffsets)
{
this.currentOffsets = currentOffsets;
}
@JsonProperty
public DateTime getStartTime()
{
@ -96,6 +98,13 @@ public class TaskReportData
return type;
}
@JsonProperty
@JsonInclude(JsonInclude.Include.NON_NULL)
public Map<Integer, Long> getLag()
{
return lag;
}
@Override
public String toString()
{
@ -105,6 +114,7 @@ public class TaskReportData
(currentOffsets != null ? ", currentOffsets=" + currentOffsets : "") +
", startTime=" + startTime +
", remainingSeconds=" + remainingSeconds +
(lag != null ? ", lag=" + lag : "") +
'}';
}
}

View File

@ -194,7 +194,8 @@ public class KafkaSupervisorTest extends EasyMockSupport
TEST_CHAT_THREADS,
TEST_CHAT_RETRIES,
TEST_HTTP_TIMEOUT,
TEST_SHUTDOWN_TIMEOUT
TEST_SHUTDOWN_TIMEOUT,
null
);
topic = getTopic();
@ -999,6 +1000,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
supervisor.start();
supervisor.runInternal();
supervisor.updateCurrentAndLatestOffsets().run();
SupervisorReport report = supervisor.getStatus();
verifyAll();
@ -1088,6 +1090,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
supervisor.start();
supervisor.runInternal();
supervisor.updateCurrentAndLatestOffsets().run();
SupervisorReport report = supervisor.getStatus();
verifyAll();
@ -1142,7 +1145,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
final DateTime startTime = new DateTime();
supervisor = getSupervisor(1, 1, true, "PT1H", null, false);
addSomeEvents(1);
addSomeEvents(6);
Task id1 = createKafkaIndexTask(
"id1",
@ -1157,7 +1160,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
"id2",
DATASOURCE,
"sequenceName-0",
new KafkaPartitions("topic", ImmutableMap.of(0, 10L, 1, 20L, 2, 30L)),
new KafkaPartitions("topic", ImmutableMap.of(0, 1L, 1, 2L, 2, 3L)),
new KafkaPartitions("topic", ImmutableMap.of(0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2, Long.MAX_VALUE)),
null
);
@ -1183,16 +1186,17 @@ public class KafkaSupervisorTest extends EasyMockSupport
expect(taskClient.getStatusAsync("id2")).andReturn(Futures.immediateFuture(KafkaIndexTask.Status.READING));
expect(taskClient.getStartTimeAsync("id2")).andReturn(Futures.immediateFuture(startTime));
expect(taskClient.getCurrentOffsetsAsync("id1", false))
.andReturn(Futures.immediateFuture((Map<Integer, Long>) ImmutableMap.of(0, 10L, 1, 20L, 2, 30L)));
expect(taskClient.getCurrentOffsets("id1", true)).andReturn(ImmutableMap.of(0, 10L, 1, 20L, 2, 30L));
.andReturn(Futures.immediateFuture((Map<Integer, Long>) ImmutableMap.of(0, 1L, 1, 2L, 2, 3L)));
expect(taskClient.getCurrentOffsets("id1", true)).andReturn(ImmutableMap.of(0, 1L, 1, 2L, 2, 3L));
expect(taskClient.getCurrentOffsetsAsync("id2", false))
.andReturn(Futures.immediateFuture((Map<Integer, Long>) ImmutableMap.of(0, 40L, 1, 50L, 2, 60L)));
.andReturn(Futures.immediateFuture((Map<Integer, Long>) ImmutableMap.of(0, 4L, 1, 5L, 2, 6L)));
taskRunner.registerListener(anyObject(TaskRunnerListener.class), anyObject(Executor.class));
replayAll();
supervisor.start();
supervisor.runInternal();
supervisor.updateCurrentAndLatestOffsets().run();
SupervisorReport report = supervisor.getStatus();
verifyAll();
@ -1215,12 +1219,19 @@ public class KafkaSupervisorTest extends EasyMockSupport
Assert.assertEquals("id2", activeReport.getId());
Assert.assertEquals(startTime, activeReport.getStartTime());
Assert.assertEquals(ImmutableMap.of(0, 10L, 1, 20L, 2, 30L), activeReport.getStartingOffsets());
Assert.assertEquals(ImmutableMap.of(0, 40L, 1, 50L, 2, 60L), activeReport.getCurrentOffsets());
Assert.assertEquals(ImmutableMap.of(0, 1L, 1, 2L, 2, 3L), activeReport.getStartingOffsets());
Assert.assertEquals(ImmutableMap.of(0, 4L, 1, 5L, 2, 6L), activeReport.getCurrentOffsets());
Assert.assertEquals(ImmutableMap.of(0, 2L, 1, 1L, 2, 0L), activeReport.getLag());
Assert.assertEquals("id1", publishingReport.getId());
Assert.assertEquals(ImmutableMap.of(0, 0L, 1, 0L, 2, 0L), publishingReport.getStartingOffsets());
Assert.assertEquals(ImmutableMap.of(0, 10L, 1, 20L, 2, 30L), publishingReport.getCurrentOffsets());
Assert.assertEquals(ImmutableMap.of(0, 1L, 1, 2L, 2, 3L), publishingReport.getCurrentOffsets());
Assert.assertEquals(null, publishingReport.getLag());
Assert.assertEquals(ImmutableMap.of(0, 6L, 1, 6L, 2, 6L), payload.getLatestOffsets());
Assert.assertEquals(ImmutableMap.of(0, 2L, 1, 1L, 2, 0L), payload.getMinimumLag());
Assert.assertEquals(3L, (long) payload.getAggregateLag());
Assert.assertTrue(payload.getOffsetsLastUpdated().plusMinutes(1).isAfterNow());
}
@Test

View File

@ -71,6 +71,7 @@ public class KafkaSupervisorTuningConfigTest
Assert.assertEquals(8L, (long) config.getChatRetries());
Assert.assertEquals(Duration.standardSeconds(10), config.getHttpTimeout());
Assert.assertEquals(Duration.standardSeconds(80), config.getShutdownTimeout());
Assert.assertEquals(Duration.standardSeconds(30), config.getOffsetFetchPeriod());
}
@Test
@ -90,7 +91,8 @@ public class KafkaSupervisorTuningConfigTest
+ " \"chatThreads\": 13,\n"
+ " \"chatRetries\": 14,\n"
+ " \"httpTimeout\": \"PT15S\",\n"
+ " \"shutdownTimeout\": \"PT95S\"\n"
+ " \"shutdownTimeout\": \"PT95S\",\n"
+ " \"offsetFetchPeriod\": \"PT20S\"\n"
+ "}";
KafkaSupervisorTuningConfig config = (KafkaSupervisorTuningConfig) mapper.readValue(
@ -116,5 +118,6 @@ public class KafkaSupervisorTuningConfigTest
Assert.assertEquals(14L, (long) config.getChatRetries());
Assert.assertEquals(Duration.standardSeconds(15), config.getHttpTimeout());
Assert.assertEquals(Duration.standardSeconds(95), config.getShutdownTimeout());
Assert.assertEquals(Duration.standardSeconds(20), config.getOffsetFetchPeriod());
}
}

View File

@ -24,10 +24,8 @@ import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.inject.Inject;
import com.metamx.emitter.EmittingLogger;
import io.druid.indexing.overlord.DataSourceMetadata;
import io.druid.java.util.common.Pair;
import io.druid.java.util.common.collect.JavaCompatUtils;
import io.druid.java.util.common.lifecycle.LifecycleStart;
import io.druid.java.util.common.lifecycle.LifecycleStop;
import io.druid.metadata.MetadataSupervisorManager;
@ -59,7 +57,7 @@ public class SupervisorManager
public Set<String> getSupervisorIds()
{
return JavaCompatUtils.keySet(supervisors);
return supervisors.keySet();
}
public Optional<SupervisorSpec> getSupervisorSpec(String id)
@ -117,7 +115,7 @@ public class SupervisorManager
Preconditions.checkState(started, "SupervisorManager not started");
synchronized (lock) {
for (String id : JavaCompatUtils.keySet(supervisors)) {
for (String id : supervisors.keySet()) {
try {
supervisors.get(id).lhs.stop(false);
}

View File

@ -1,35 +0,0 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.java.util.common.collect;
import java.util.Map;
import java.util.Set;
public class JavaCompatUtils
{
/**
* Equivalent to theMap.keySet(), but works around a Java 7 compat issue. See also
* https://github.com/druid-io/druid/issues/3795.
*/
public static <K, V> Set<K> keySet(Map<K, V> theMap)
{
return theMap.keySet();
}
}