diff --git a/docs/content/development/extensions-core/kafka-ingestion.md b/docs/content/development/extensions-core/kafka-ingestion.md index 0ce192c11b8..7049f212b8b 100644 --- a/docs/content/development/extensions-core/kafka-ingestion.md +++ b/docs/content/development/extensions-core/kafka-ingestion.md @@ -129,6 +129,7 @@ The tuningConfig is optional and default parameters will be used if no tuningCon |`chatRetries`|Integer|The number of times HTTP requests to indexing tasks will be retried before considering tasks unresponsive.|no (default == 8)| |`httpTimeout`|ISO8601 Period|How long to wait for a HTTP response from an indexing task.|no (default == PT10S)| |`shutdownTimeout`|ISO8601 Period|How long to wait for the supervisor to attempt a graceful shutdown of tasks before exiting.|no (default == PT80S)| +|`offsetFetchPeriod`|ISO8601 Period|How often the supervisor queries Kafka and the indexing tasks to fetch current offsets and calculate lag.|no (default == PT30S, min == PT5S)| #### IndexSpec @@ -211,7 +212,10 @@ Returns the current spec for the supervisor with the provided ID. ``` GET /druid/indexer/v1/supervisor//status ``` -Returns a snapshot report of the current state of the tasks managed by the given supervisor. +Returns a snapshot report of the current state of the tasks managed by the given supervisor. This includes the latest +offsets as reported by Kafka, the consumer lag per partition, as well as the aggregate lag of all partitions. The +consumer lag per partition may be reported as negative values if the supervisor has not received a recent latest offset +response from Kafka. The aggregate lag value will always be >= 0. #### Get All Supervisor History ``` diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java index 3634ece1faf..349afac438c 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java +++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java @@ -67,7 +67,6 @@ import io.druid.indexing.overlord.supervisor.Supervisor; import io.druid.indexing.overlord.supervisor.SupervisorReport; import io.druid.java.util.common.IAE; import io.druid.java.util.common.ISE; -import io.druid.java.util.common.collect.JavaCompatUtils; import io.druid.metadata.EntryExistsException; import io.druid.server.metrics.DruidMonitorSchedulerConfig; import org.apache.commons.codec.digest.DigestUtils; @@ -96,7 +95,8 @@ import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import java.util.function.BiFunction; +import java.util.stream.Collectors; +import java.util.stream.Stream; /** * Supervisor responsible for managing the KafkaIndexTasks for a single dataSource. At a high level, the class accepts a @@ -113,6 +113,9 @@ public class KafkaSupervisor implements Supervisor private static final long MAX_RUN_FREQUENCY_MILLIS = 1000; // prevent us from running too often in response to events private static final long NOT_SET = -1; private static final long MINIMUM_FUTURE_TIMEOUT_IN_SECONDS = 120; + private static final long MINIMUM_GET_OFFSET_PERIOD_MILLIS = 5000; + private static final long INITIAL_GET_OFFSET_DELAY_MILLIS = 15000; + private static final long INITIAL_EMIT_LAG_METRIC_DELAY_MILLIS = 25000; // Internal data structures // -------------------------------------------------------- @@ -146,14 +149,15 @@ public class KafkaSupervisor implements Supervisor Set taskIds() { - return JavaCompatUtils.keySet(tasks); + return tasks.keySet(); } } private static class TaskData { - TaskStatus status; - DateTime startTime; + volatile TaskStatus status; + volatile DateTime startTime; + volatile Map currentOffsets = new HashMap<>(); } // Map<{group ID}, {actively reading task group}>; see documentation for TaskGroup class @@ -196,23 +200,22 @@ public class KafkaSupervisor implements Supervisor private final ExecutorService exec; private final ScheduledExecutorService scheduledExec; + private final ScheduledExecutorService reportingExec; private final ListeningExecutorService workerExec; private final BlockingQueue notices = new LinkedBlockingDeque<>(); private final Object stopLock = new Object(); private final Object stateChangeLock = new Object(); + private final Object consumerLock = new Object(); private boolean listenerRegistered = false; private long lastRunTime; private volatile DateTime firstRunTime; private volatile KafkaConsumer consumer; - private volatile KafkaConsumer lagComputingConsumer; private volatile boolean started = false; private volatile boolean stopped = false; - - private final ScheduledExecutorService metricEmittingExec; - // used while reporting lag - private final Map lastCurrentOffsets = new HashMap<>(); + private volatile Map latestOffsetsFromKafka; + private volatile DateTime offsetsLastUpdated; public KafkaSupervisor( final TaskStorage taskStorage, @@ -238,7 +241,7 @@ public class KafkaSupervisor implements Supervisor this.supervisorId = String.format("KafkaSupervisor-%s", dataSource); this.exec = Execs.singleThreaded(supervisorId); this.scheduledExec = Execs.scheduledSingleThreaded(supervisorId + "-Scheduler-%d"); - this.metricEmittingExec = Execs.scheduledSingleThreaded(supervisorId + "-Emitter-%d"); + this.reportingExec = Execs.scheduledSingleThreaded(supervisorId + "-Reporting-%d"); int workerThreads = (this.tuningConfig.getWorkerThreads() != null ? this.tuningConfig.getWorkerThreads() @@ -316,7 +319,6 @@ public class KafkaSupervisor implements Supervisor try { consumer = getKafkaConsumer(); - lagComputingConsumer = getKafkaConsumer(); exec.submit( new Runnable() @@ -352,10 +354,19 @@ public class KafkaSupervisor implements Supervisor TimeUnit.MILLISECONDS ); - metricEmittingExec.scheduleAtFixedRate( - computeAndEmitLag(taskClient), - ioConfig.getStartDelay().getMillis() + 10000, // wait for tasks to start up - Math.max(monitorSchedulerConfig.getEmitterPeriod().getMillis(), 60 * 1000), + reportingExec.scheduleAtFixedRate( + updateCurrentAndLatestOffsets(), + ioConfig.getStartDelay().getMillis() + INITIAL_GET_OFFSET_DELAY_MILLIS, // wait for tasks to start up + Math.max( + tuningConfig.getOffsetFetchPeriod().getMillis(), MINIMUM_GET_OFFSET_PERIOD_MILLIS + ), + TimeUnit.MILLISECONDS + ); + + reportingExec.scheduleAtFixedRate( + emitLag(), + ioConfig.getStartDelay().getMillis() + INITIAL_EMIT_LAG_METRIC_DELAY_MILLIS, // wait for tasks to start up + monitorSchedulerConfig.getEmitterPeriod().getMillis(), TimeUnit.MILLISECONDS ); @@ -371,9 +382,6 @@ public class KafkaSupervisor implements Supervisor if (consumer != null) { consumer.close(); } - if (lagComputingConsumer != null) { - lagComputingConsumer.close(); - } log.makeAlert(e, "Exception starting KafkaSupervisor[%s]", dataSource) .emit(); throw Throwables.propagate(e); @@ -391,7 +399,7 @@ public class KafkaSupervisor implements Supervisor try { scheduledExec.shutdownNow(); // stop recurring executions - metricEmittingExec.shutdownNow(); + reportingExec.shutdownNow(); Optional taskRunner = taskMaster.getTaskRunner(); if (taskRunner.isPresent()) { @@ -525,7 +533,6 @@ public class KafkaSupervisor implements Supervisor public void handle() throws InterruptedException, ExecutionException, TimeoutException { consumer.close(); - lagComputingConsumer.close(); synchronized (stopLock) { stopped = true; @@ -558,7 +565,7 @@ public class KafkaSupervisor implements Supervisor // Reset everything boolean result = indexerMetadataStorageCoordinator.deleteDataSourceMetadata(dataSource); log.info("Reset dataSource[%s] - dataSource metadata entry deleted? [%s]", dataSource, result); - killTaskGroupForPartitions(JavaCompatUtils.keySet(taskGroups)); + killTaskGroupForPartitions(taskGroups.keySet()); } else if (!(dataSourceMetadata instanceof KafkaDataSourceMetadata)) { throw new IAE("Expected KafkaDataSourceMetadata but found instance of [%s]", dataSourceMetadata.getClass()); } else { @@ -614,8 +621,7 @@ public class KafkaSupervisor implements Supervisor } } if (metadataUpdateSuccess) { - killTaskGroupForPartitions(JavaCompatUtils.keySet(resetKafkaMetadata.getKafkaPartitions() - .getPartitionOffsetMap())); + killTaskGroupForPartitions(resetKafkaMetadata.getKafkaPartitions().getPartitionOffsetMap().keySet()); } else { throw new ISE("Unable to reset metadata"); } @@ -635,7 +641,7 @@ public class KafkaSupervisor implements Supervisor TaskGroup taskGroup = taskGroups.get(getTaskGroupIdForPartition(partition)); if (taskGroup != null) { // kill all tasks in this task group - for (String taskId : JavaCompatUtils.keySet(taskGroup.tasks)) { + for (String taskId : taskGroup.tasks.keySet()) { log.info("Reset dataSource[%s] - killing task [%s]", dataSource, taskId); killTask(taskId); } @@ -747,7 +753,9 @@ public class KafkaSupervisor implements Supervisor { Map> topics; try { - topics = consumer.listTopics(); // updates the consumer's list of partitions from the brokers + synchronized (consumerLock) { + topics = consumer.listTopics(); // updates the consumer's list of partitions from the brokers + } } catch (Exception e) { // calls to the consumer throw NPEs when the broker doesn't respond log.warn( @@ -1342,13 +1350,9 @@ public class KafkaSupervisor implements Supervisor void createNewTasks() { // check that there is a current task group for each group of partitions in [partitionGroups] - for (Integer groupId : JavaCompatUtils.keySet(partitionGroups)) { + for (Integer groupId : partitionGroups.keySet()) { if (!taskGroups.containsKey(groupId)) { - log.info( - "Creating new task group [%d] for partitions %s", - groupId, - JavaCompatUtils.keySet(partitionGroups.get(groupId)) - ); + log.info("Creating new task group [%d] for partitions %s", groupId, partitionGroups.get(groupId).keySet()); Optional minimumMessageTime = (ioConfig.getLateMessageRejectionPeriod().isPresent() ? Optional.of( DateTime.now().minus(ioConfig.getLateMessageRejectionPeriod().get()) @@ -1507,18 +1511,20 @@ public class KafkaSupervisor implements Supervisor private long getOffsetFromKafkaForPartition(int partition, boolean useEarliestOffset) { - TopicPartition topicPartition = new TopicPartition(ioConfig.getTopic(), partition); - if (!consumer.assignment().contains(topicPartition)) { - consumer.assign(Collections.singletonList(topicPartition)); - } + synchronized (consumerLock) { + TopicPartition topicPartition = new TopicPartition(ioConfig.getTopic(), partition); + if (!consumer.assignment().contains(topicPartition)) { + consumer.assign(Collections.singletonList(topicPartition)); + } - if (useEarliestOffset) { - consumer.seekToBeginning(Collections.singletonList(topicPartition)); - } else { - consumer.seekToEnd(Collections.singletonList(topicPartition)); - } + if (useEarliestOffset) { + consumer.seekToBeginning(Collections.singletonList(topicPartition)); + } else { + consumer.seekToEnd(Collections.singletonList(topicPartition)); + } - return consumer.position(topicPartition); + return consumer.position(topicPartition); + } } /** @@ -1602,28 +1608,30 @@ public class KafkaSupervisor implements Supervisor private KafkaSupervisorReport generateReport(boolean includeOffsets) { - int numPartitions = 0; - for (Map partitionGroup : partitionGroups.values()) { - numPartitions += partitionGroup.size(); - } + int numPartitions = partitionGroups.values().stream().mapToInt(Map::size).sum(); + Map partitionLag = getLagPerPartition(getHighestCurrentOffsets()); KafkaSupervisorReport report = new KafkaSupervisorReport( dataSource, DateTime.now(), ioConfig.getTopic(), numPartitions, ioConfig.getReplicas(), - ioConfig.getTaskDuration().getMillis() / 1000 + ioConfig.getTaskDuration().getMillis() / 1000, + includeOffsets ? latestOffsetsFromKafka : null, + includeOffsets ? partitionLag : null, + includeOffsets ? partitionLag.values().stream().mapToLong(x -> Math.max(x, 0)).sum() : null, + includeOffsets ? offsetsLastUpdated : null ); List taskReports = Lists.newArrayList(); - List>> futures = Lists.newArrayList(); try { for (TaskGroup taskGroup : taskGroups.values()) { for (Map.Entry entry : taskGroup.tasks.entrySet()) { String taskId = entry.getKey(); DateTime startTime = entry.getValue().startTime; + Map currentOffsets = entry.getValue().currentOffsets; Long remainingSeconds = null; if (startTime != null) { remainingSeconds = Math.max( @@ -1634,17 +1642,14 @@ public class KafkaSupervisor implements Supervisor taskReports.add( new TaskReportData( taskId, - (includeOffsets ? taskGroup.partitionOffsets : null), - null, + includeOffsets ? taskGroup.partitionOffsets : null, + includeOffsets ? currentOffsets : null, startTime, remainingSeconds, - TaskReportData.TaskType.ACTIVE + TaskReportData.TaskType.ACTIVE, + includeOffsets ? getLagPerPartition(currentOffsets) : null ) ); - - if (includeOffsets) { - futures.add(taskClient.getCurrentOffsetsAsync(taskId, false)); - } } } @@ -1653,6 +1658,7 @@ public class KafkaSupervisor implements Supervisor for (Map.Entry entry : taskGroup.tasks.entrySet()) { String taskId = entry.getKey(); DateTime startTime = entry.getValue().startTime; + Map currentOffsets = entry.getValue().currentOffsets; Long remainingSeconds = null; if (taskGroup.completionTimeout != null) { remainingSeconds = Math.max(0, taskGroup.completionTimeout.getMillis() - DateTime.now().getMillis()) @@ -1662,30 +1668,19 @@ public class KafkaSupervisor implements Supervisor taskReports.add( new TaskReportData( taskId, - (includeOffsets ? taskGroup.partitionOffsets : null), - null, + includeOffsets ? taskGroup.partitionOffsets : null, + includeOffsets ? currentOffsets : null, startTime, remainingSeconds, - TaskReportData.TaskType.PUBLISHING + TaskReportData.TaskType.PUBLISHING, + null ) ); - - if (includeOffsets) { - futures.add(taskClient.getCurrentOffsetsAsync(taskId, false)); - } } } } - List> results = Futures.successfulAsList(futures) - .get(futureTimeoutInSeconds, TimeUnit.SECONDS); - for (int i = 0; i < taskReports.size(); i++) { - TaskReportData reportData = taskReports.get(i); - if (includeOffsets) { - reportData.setCurrentOffsets(results.get(i)); - } - report.addTask(reportData); - } + taskReports.stream().forEach(report::addTask); } catch (Exception e) { log.warn(e, "Failed to generate status report"); @@ -1696,111 +1691,128 @@ public class KafkaSupervisor implements Supervisor private Runnable buildRunTask() { - return new Runnable() - { - @Override - public void run() - { - notices.add(new RunNotice()); + return () -> notices.add(new RunNotice()); + } + + private void updateLatestOffsetsFromKafka() + { + synchronized (consumerLock) { + final Map> topics = consumer.listTopics(); + + if (topics == null || !topics.containsKey(ioConfig.getTopic())) { + throw new ISE("Could not retrieve partitions for topic [%s]", ioConfig.getTopic()); + } + + final Set topicPartitions = topics.get(ioConfig.getTopic()) + .stream() + .map(x -> new TopicPartition(x.topic(), x.partition())) + .collect(Collectors.toSet()); + consumer.assign(topicPartitions); + consumer.seekToEnd(topicPartitions); + + latestOffsetsFromKafka = topicPartitions + .stream() + .collect(Collectors.toMap(TopicPartition::partition, consumer::position)); + } + } + + private Map getHighestCurrentOffsets() + { + return taskGroups + .values() + .stream() + .flatMap(taskGroup -> taskGroup.tasks.entrySet().stream()) + .flatMap(taskData -> taskData.getValue().currentOffsets.entrySet().stream()) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue, Long::max)); + } + + private Map getLagPerPartition(Map currentOffsets) + { + return currentOffsets + .entrySet() + .stream() + .collect( + Collectors.toMap( + Map.Entry::getKey, + e -> latestOffsetsFromKafka != null + && latestOffsetsFromKafka.get(e.getKey()) != null + && e.getValue() != null + ? latestOffsetsFromKafka.get(e.getKey()) - e.getValue() + : null + ) + ); + } + + private Runnable emitLag() + { + return () -> { + try { + Map highestCurrentOffsets = getHighestCurrentOffsets(); + + if (latestOffsetsFromKafka == null) { + throw new ISE("Latest offsets from Kafka have not been fetched"); + } + + if (!latestOffsetsFromKafka.keySet().equals(highestCurrentOffsets.keySet())) { + log.warn( + "Lag metric: Kafka partitions %s do not match task partitions %s", + latestOffsetsFromKafka.keySet(), + highestCurrentOffsets.keySet() + ); + } + + long lag = getLagPerPartition(highestCurrentOffsets) + .values() + .stream() + .mapToLong(x -> Math.max(x, 0)) + .sum(); + + emitter.emit( + ServiceMetricEvent.builder().setDimension("dataSource", dataSource).build("ingest/kafka/lag", lag) + ); + } + catch (Exception e) { + log.warn(e, "Unable to compute Kafka lag"); } }; } - private Runnable computeAndEmitLag(final KafkaIndexTaskClient taskClient) + private void updateCurrentOffsets() throws InterruptedException, ExecutionException, TimeoutException { - return new Runnable() - { - @Override - public void run() - { - try { - final Map> topics = lagComputingConsumer.listTopics(); - final List partitionInfoList = topics.get(ioConfig.getTopic()); - lagComputingConsumer.assign( - Lists.transform(partitionInfoList, new Function() - { - @Override - public TopicPartition apply(PartitionInfo input) - { - return new TopicPartition(ioConfig.getTopic(), input.partition()); - } - }) - ); - final Map offsetsResponse = new ConcurrentHashMap<>(); - final List> futures = Lists.newArrayList(); - for (TaskGroup taskGroup : taskGroups.values()) { - for (String taskId : taskGroup.taskIds()) { - futures.add(Futures.transform( - taskClient.getCurrentOffsetsAsync(taskId, false), - new Function, Void>() - { - @Override - public Void apply(Map taskResponse) - { - if (taskResponse != null) { - for (final Map.Entry partitionOffsets : taskResponse.entrySet()) { - offsetsResponse.compute(partitionOffsets.getKey(), new BiFunction() - { - @Override - public Long apply(Integer key, Long existingOffsetInMap) - { - // If existing value is null use the offset returned by task - // otherwise use the max (makes sure max offset is taken from replicas) - return existingOffsetInMap == null - ? partitionOffsets.getValue() - : Math.max(partitionOffsets.getValue(), existingOffsetInMap); - } - }); - } - } - return null; - } - } - ) - ); - } - } - // not using futureTimeoutInSeconds as its min value is 120 seconds - // and minimum emission period for this metric is 60 seconds - Futures.successfulAsList(futures).get(30, TimeUnit.SECONDS); + final List> futures = Stream.concat( + taskGroups.values().stream().flatMap(taskGroup -> taskGroup.tasks.entrySet().stream()), + pendingCompletionTaskGroups.values() + .stream() + .flatMap(List::stream) + .flatMap(taskGroup -> taskGroup.tasks.entrySet().stream()) + ).map( + task -> Futures.transform( + taskClient.getCurrentOffsetsAsync(task.getKey(), false), + (Function, Void>) (currentOffsets) -> { - // for each partition, seek to end to get the highest offset - // check the offsetsResponse map for the latest consumed offset - // if partition info not present in offsetsResponse then use lastCurrentOffsets map - // if not present there as well, fail the compute + if (currentOffsets != null && !currentOffsets.isEmpty()) { + task.getValue().currentOffsets = currentOffsets; + } - long lag = 0; - for (PartitionInfo partitionInfo : partitionInfoList) { - long diff; - final TopicPartition topicPartition = new TopicPartition(ioConfig.getTopic(), partitionInfo.partition()); - lagComputingConsumer.seekToEnd(ImmutableList.of(topicPartition)); - if (offsetsResponse.get(topicPartition.partition()) != null) { - diff = lagComputingConsumer.position(topicPartition) - offsetsResponse.get(topicPartition.partition()); - lastCurrentOffsets.put(topicPartition.partition(), offsetsResponse.get(topicPartition.partition())); - } else if (lastCurrentOffsets.get(topicPartition.partition()) != null) { - diff = lagComputingConsumer.position(topicPartition) - lastCurrentOffsets.get(topicPartition.partition()); - } else { - throw new ISE("Could not find latest consumed offset for partition [%d]", topicPartition.partition()); + return null; } - lag += diff; - log.debug( - "Topic - [%s] Partition - [%d] : Partition lag [%,d], Total lag so far [%,d]", - topicPartition.topic(), - topicPartition.partition(), - diff, - lag - ); - } - emitter.emit( - ServiceMetricEvent.builder().setDimension("dataSource", dataSource).build("ingest/kafka/lag", lag) - ); - } - catch (InterruptedException e) { - // do nothing, probably we are shutting down - } - catch (Exception e) { - log.warn(e, "Unable to compute Kafka lag"); - } + ) + ).collect(Collectors.toList()); + + Futures.successfulAsList(futures).get(futureTimeoutInSeconds, TimeUnit.SECONDS); + } + + @VisibleForTesting + Runnable updateCurrentAndLatestOffsets() + { + return () -> { + try { + updateCurrentOffsets(); + updateLatestOffsetsFromKafka(); + offsetsLastUpdated = DateTime.now(); + } + catch (Exception e) { + log.warn(e, "Exception while getting current/latest offsets"); } }; } diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorReport.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorReport.java index a6e0c761c1a..37bbf7c8016 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorReport.java +++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorReport.java @@ -19,15 +19,16 @@ package io.druid.indexing.kafka.supervisor; +import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.collect.Lists; - import io.druid.indexing.overlord.supervisor.SupervisorReport; import io.druid.java.util.common.IAE; - import org.joda.time.DateTime; +import javax.annotation.Nullable; import java.util.List; +import java.util.Map; public class KafkaSupervisorReport extends SupervisorReport { @@ -40,13 +41,21 @@ public class KafkaSupervisorReport extends SupervisorReport private final Long durationSeconds; private final List activeTasks; private final List publishingTasks; + private final Map latestOffsets; + private final Map minimumLag; + private final Long aggregateLag; + private final DateTime offsetsLastUpdated; public KafkaSupervisorReportPayload( String dataSource, String topic, Integer partitions, Integer replicas, - Long durationSeconds + Long durationSeconds, + @Nullable Map latestOffsets, + @Nullable Map minimumLag, + @Nullable Long aggregateLag, + @Nullable DateTime offsetsLastUpdated ) { this.dataSource = dataSource; @@ -56,6 +65,10 @@ public class KafkaSupervisorReport extends SupervisorReport this.durationSeconds = durationSeconds; this.activeTasks = Lists.newArrayList(); this.publishingTasks = Lists.newArrayList(); + this.latestOffsets = latestOffsets; + this.minimumLag = minimumLag; + this.aggregateLag = aggregateLag; + this.offsetsLastUpdated = offsetsLastUpdated; } @JsonProperty @@ -100,6 +113,33 @@ public class KafkaSupervisorReport extends SupervisorReport return publishingTasks; } + @JsonProperty + @JsonInclude(JsonInclude.Include.NON_NULL) + public Map getLatestOffsets() + { + return latestOffsets; + } + + @JsonProperty + @JsonInclude(JsonInclude.Include.NON_NULL) + public Map getMinimumLag() + { + return minimumLag; + } + + @JsonProperty + @JsonInclude(JsonInclude.Include.NON_NULL) + public Long getAggregateLag() + { + return aggregateLag; + } + + @JsonProperty + public DateTime getOffsetsLastUpdated() + { + return offsetsLastUpdated; + } + @Override public String toString() { @@ -111,6 +151,10 @@ public class KafkaSupervisorReport extends SupervisorReport ", durationSeconds=" + durationSeconds + ", active=" + activeTasks + ", publishing=" + publishingTasks + + (latestOffsets != null ? ", latestOffsets=" + latestOffsets : "") + + (minimumLag != null ? ", minimumLag=" + minimumLag : "") + + (aggregateLag != null ? ", aggregateLag=" + aggregateLag : "") + + (offsetsLastUpdated != null ? ", offsetsLastUpdated=" + offsetsLastUpdated : "") + '}'; } } @@ -123,11 +167,25 @@ public class KafkaSupervisorReport extends SupervisorReport String topic, Integer partitions, Integer replicas, - Long durationSeconds + Long durationSeconds, + @Nullable Map latestOffsets, + @Nullable Map minimumLag, + @Nullable Long aggregateLag, + @Nullable DateTime offsetsLastUpdated ) { super(dataSource, generationTime); - this.payload = new KafkaSupervisorReportPayload(dataSource, topic, partitions, replicas, durationSeconds); + this.payload = new KafkaSupervisorReportPayload( + dataSource, + topic, + partitions, + replicas, + durationSeconds, + latestOffsets, + minimumLag, + aggregateLag, + offsetsLastUpdated + ); } @Override diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java index 2bcd5c70ccf..c476b05e105 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java +++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java @@ -87,6 +87,7 @@ public class KafkaSupervisorSpec implements SupervisorSpec null, null, null, + null, null ); this.ioConfig = Preconditions.checkNotNull(ioConfig, "ioConfig"); diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfig.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfig.java index c04771a6ae6..39b9666396a 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfig.java +++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfig.java @@ -34,6 +34,7 @@ public class KafkaSupervisorTuningConfig extends KafkaTuningConfig private final Long chatRetries; private final Duration httpTimeout; private final Duration shutdownTimeout; + private final Duration offsetFetchPeriod; public KafkaSupervisorTuningConfig( @JsonProperty("maxRowsInMemory") Integer maxRowsInMemory, @@ -50,7 +51,8 @@ public class KafkaSupervisorTuningConfig extends KafkaTuningConfig @JsonProperty("chatThreads") Integer chatThreads, @JsonProperty("chatRetries") Long chatRetries, @JsonProperty("httpTimeout") Period httpTimeout, - @JsonProperty("shutdownTimeout") Period shutdownTimeout + @JsonProperty("shutdownTimeout") Period shutdownTimeout, + @JsonProperty("offsetFetchPeriod") Period offsetFetchPeriod ) { super( @@ -73,6 +75,7 @@ public class KafkaSupervisorTuningConfig extends KafkaTuningConfig this.chatRetries = (chatRetries != null ? chatRetries : 8); this.httpTimeout = defaultDuration(httpTimeout, "PT10S"); this.shutdownTimeout = defaultDuration(shutdownTimeout, "PT80S"); + this.offsetFetchPeriod = defaultDuration(offsetFetchPeriod, "PT30S"); } @JsonProperty @@ -105,6 +108,12 @@ public class KafkaSupervisorTuningConfig extends KafkaTuningConfig return shutdownTimeout; } + @JsonProperty + public Duration getOffsetFetchPeriod() + { + return offsetFetchPeriod; + } + @Override public String toString() { @@ -124,6 +133,7 @@ public class KafkaSupervisorTuningConfig extends KafkaTuningConfig ", chatRetries=" + chatRetries + ", httpTimeout=" + httpTimeout + ", shutdownTimeout=" + shutdownTimeout + + ", offsetFetchPeriod=" + offsetFetchPeriod + '}'; } diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/TaskReportData.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/TaskReportData.java index dcb4fd474d5..923f69ec401 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/TaskReportData.java +++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/TaskReportData.java @@ -19,9 +19,11 @@ package io.druid.indexing.kafka.supervisor; +import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.annotation.JsonProperty; import org.joda.time.DateTime; +import javax.annotation.Nullable; import java.util.Map; public class TaskReportData @@ -36,15 +38,17 @@ public class TaskReportData private final DateTime startTime; private final Long remainingSeconds; private final TaskType type; - private Map currentOffsets; + private final Map currentOffsets; + private final Map lag; public TaskReportData( String id, - Map startingOffsets, - Map currentOffsets, + @Nullable Map startingOffsets, + @Nullable Map currentOffsets, DateTime startTime, Long remainingSeconds, - TaskType type + TaskType type, + @Nullable Map lag ) { this.id = id; @@ -53,6 +57,7 @@ public class TaskReportData this.startTime = startTime; this.remainingSeconds = remainingSeconds; this.type = type; + this.lag = lag; } @JsonProperty @@ -62,22 +67,19 @@ public class TaskReportData } @JsonProperty + @JsonInclude(JsonInclude.Include.NON_NULL) public Map getStartingOffsets() { return startingOffsets; } @JsonProperty + @JsonInclude(JsonInclude.Include.NON_NULL) public Map getCurrentOffsets() { return currentOffsets; } - public void setCurrentOffsets(Map currentOffsets) - { - this.currentOffsets = currentOffsets; - } - @JsonProperty public DateTime getStartTime() { @@ -96,6 +98,13 @@ public class TaskReportData return type; } + @JsonProperty + @JsonInclude(JsonInclude.Include.NON_NULL) + public Map getLag() + { + return lag; + } + @Override public String toString() { @@ -105,6 +114,7 @@ public class TaskReportData (currentOffsets != null ? ", currentOffsets=" + currentOffsets : "") + ", startTime=" + startTime + ", remainingSeconds=" + remainingSeconds + + (lag != null ? ", lag=" + lag : "") + '}'; } } diff --git a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java index 6d2c8f66dbe..1c5c0239010 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java @@ -194,7 +194,8 @@ public class KafkaSupervisorTest extends EasyMockSupport TEST_CHAT_THREADS, TEST_CHAT_RETRIES, TEST_HTTP_TIMEOUT, - TEST_SHUTDOWN_TIMEOUT + TEST_SHUTDOWN_TIMEOUT, + null ); topic = getTopic(); @@ -999,6 +1000,7 @@ public class KafkaSupervisorTest extends EasyMockSupport supervisor.start(); supervisor.runInternal(); + supervisor.updateCurrentAndLatestOffsets().run(); SupervisorReport report = supervisor.getStatus(); verifyAll(); @@ -1088,6 +1090,7 @@ public class KafkaSupervisorTest extends EasyMockSupport supervisor.start(); supervisor.runInternal(); + supervisor.updateCurrentAndLatestOffsets().run(); SupervisorReport report = supervisor.getStatus(); verifyAll(); @@ -1142,7 +1145,7 @@ public class KafkaSupervisorTest extends EasyMockSupport final DateTime startTime = new DateTime(); supervisor = getSupervisor(1, 1, true, "PT1H", null, false); - addSomeEvents(1); + addSomeEvents(6); Task id1 = createKafkaIndexTask( "id1", @@ -1157,7 +1160,7 @@ public class KafkaSupervisorTest extends EasyMockSupport "id2", DATASOURCE, "sequenceName-0", - new KafkaPartitions("topic", ImmutableMap.of(0, 10L, 1, 20L, 2, 30L)), + new KafkaPartitions("topic", ImmutableMap.of(0, 1L, 1, 2L, 2, 3L)), new KafkaPartitions("topic", ImmutableMap.of(0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2, Long.MAX_VALUE)), null ); @@ -1183,16 +1186,17 @@ public class KafkaSupervisorTest extends EasyMockSupport expect(taskClient.getStatusAsync("id2")).andReturn(Futures.immediateFuture(KafkaIndexTask.Status.READING)); expect(taskClient.getStartTimeAsync("id2")).andReturn(Futures.immediateFuture(startTime)); expect(taskClient.getCurrentOffsetsAsync("id1", false)) - .andReturn(Futures.immediateFuture((Map) ImmutableMap.of(0, 10L, 1, 20L, 2, 30L))); - expect(taskClient.getCurrentOffsets("id1", true)).andReturn(ImmutableMap.of(0, 10L, 1, 20L, 2, 30L)); + .andReturn(Futures.immediateFuture((Map) ImmutableMap.of(0, 1L, 1, 2L, 2, 3L))); + expect(taskClient.getCurrentOffsets("id1", true)).andReturn(ImmutableMap.of(0, 1L, 1, 2L, 2, 3L)); expect(taskClient.getCurrentOffsetsAsync("id2", false)) - .andReturn(Futures.immediateFuture((Map) ImmutableMap.of(0, 40L, 1, 50L, 2, 60L))); + .andReturn(Futures.immediateFuture((Map) ImmutableMap.of(0, 4L, 1, 5L, 2, 6L))); taskRunner.registerListener(anyObject(TaskRunnerListener.class), anyObject(Executor.class)); replayAll(); supervisor.start(); supervisor.runInternal(); + supervisor.updateCurrentAndLatestOffsets().run(); SupervisorReport report = supervisor.getStatus(); verifyAll(); @@ -1215,12 +1219,19 @@ public class KafkaSupervisorTest extends EasyMockSupport Assert.assertEquals("id2", activeReport.getId()); Assert.assertEquals(startTime, activeReport.getStartTime()); - Assert.assertEquals(ImmutableMap.of(0, 10L, 1, 20L, 2, 30L), activeReport.getStartingOffsets()); - Assert.assertEquals(ImmutableMap.of(0, 40L, 1, 50L, 2, 60L), activeReport.getCurrentOffsets()); + Assert.assertEquals(ImmutableMap.of(0, 1L, 1, 2L, 2, 3L), activeReport.getStartingOffsets()); + Assert.assertEquals(ImmutableMap.of(0, 4L, 1, 5L, 2, 6L), activeReport.getCurrentOffsets()); + Assert.assertEquals(ImmutableMap.of(0, 2L, 1, 1L, 2, 0L), activeReport.getLag()); Assert.assertEquals("id1", publishingReport.getId()); Assert.assertEquals(ImmutableMap.of(0, 0L, 1, 0L, 2, 0L), publishingReport.getStartingOffsets()); - Assert.assertEquals(ImmutableMap.of(0, 10L, 1, 20L, 2, 30L), publishingReport.getCurrentOffsets()); + Assert.assertEquals(ImmutableMap.of(0, 1L, 1, 2L, 2, 3L), publishingReport.getCurrentOffsets()); + Assert.assertEquals(null, publishingReport.getLag()); + + Assert.assertEquals(ImmutableMap.of(0, 6L, 1, 6L, 2, 6L), payload.getLatestOffsets()); + Assert.assertEquals(ImmutableMap.of(0, 2L, 1, 1L, 2, 0L), payload.getMinimumLag()); + Assert.assertEquals(3L, (long) payload.getAggregateLag()); + Assert.assertTrue(payload.getOffsetsLastUpdated().plusMinutes(1).isAfterNow()); } @Test diff --git a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfigTest.java b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfigTest.java index 65a862ec63d..a7b69db857d 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfigTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfigTest.java @@ -71,6 +71,7 @@ public class KafkaSupervisorTuningConfigTest Assert.assertEquals(8L, (long) config.getChatRetries()); Assert.assertEquals(Duration.standardSeconds(10), config.getHttpTimeout()); Assert.assertEquals(Duration.standardSeconds(80), config.getShutdownTimeout()); + Assert.assertEquals(Duration.standardSeconds(30), config.getOffsetFetchPeriod()); } @Test @@ -90,7 +91,8 @@ public class KafkaSupervisorTuningConfigTest + " \"chatThreads\": 13,\n" + " \"chatRetries\": 14,\n" + " \"httpTimeout\": \"PT15S\",\n" - + " \"shutdownTimeout\": \"PT95S\"\n" + + " \"shutdownTimeout\": \"PT95S\",\n" + + " \"offsetFetchPeriod\": \"PT20S\"\n" + "}"; KafkaSupervisorTuningConfig config = (KafkaSupervisorTuningConfig) mapper.readValue( @@ -116,5 +118,6 @@ public class KafkaSupervisorTuningConfigTest Assert.assertEquals(14L, (long) config.getChatRetries()); Assert.assertEquals(Duration.standardSeconds(15), config.getHttpTimeout()); Assert.assertEquals(Duration.standardSeconds(95), config.getShutdownTimeout()); + Assert.assertEquals(Duration.standardSeconds(20), config.getOffsetFetchPeriod()); } } diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/supervisor/SupervisorManager.java b/indexing-service/src/main/java/io/druid/indexing/overlord/supervisor/SupervisorManager.java index 7f467079215..eb7bdaf99c0 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/supervisor/SupervisorManager.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/supervisor/SupervisorManager.java @@ -24,10 +24,8 @@ import com.google.common.base.Preconditions; import com.google.common.base.Throwables; import com.google.inject.Inject; import com.metamx.emitter.EmittingLogger; - import io.druid.indexing.overlord.DataSourceMetadata; import io.druid.java.util.common.Pair; -import io.druid.java.util.common.collect.JavaCompatUtils; import io.druid.java.util.common.lifecycle.LifecycleStart; import io.druid.java.util.common.lifecycle.LifecycleStop; import io.druid.metadata.MetadataSupervisorManager; @@ -59,7 +57,7 @@ public class SupervisorManager public Set getSupervisorIds() { - return JavaCompatUtils.keySet(supervisors); + return supervisors.keySet(); } public Optional getSupervisorSpec(String id) @@ -117,7 +115,7 @@ public class SupervisorManager Preconditions.checkState(started, "SupervisorManager not started"); synchronized (lock) { - for (String id : JavaCompatUtils.keySet(supervisors)) { + for (String id : supervisors.keySet()) { try { supervisors.get(id).lhs.stop(false); } diff --git a/java-util/src/main/java/io/druid/java/util/common/collect/JavaCompatUtils.java b/java-util/src/main/java/io/druid/java/util/common/collect/JavaCompatUtils.java deleted file mode 100644 index 95a307bee67..00000000000 --- a/java-util/src/main/java/io/druid/java/util/common/collect/JavaCompatUtils.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Licensed to Metamarkets Group Inc. (Metamarkets) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. Metamarkets licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package io.druid.java.util.common.collect; - -import java.util.Map; -import java.util.Set; - -public class JavaCompatUtils -{ - /** - * Equivalent to theMap.keySet(), but works around a Java 7 compat issue. See also - * https://github.com/druid-io/druid/issues/3795. - */ - public static Set keySet(Map theMap) - { - return theMap.keySet(); - } -}