From 609833c97bb44fee50f4fb84e44ad7c6cb053714 Mon Sep 17 00:00:00 2001 From: AmatyaAvadhanula Date: Wed, 5 Jul 2023 14:44:23 +0530 Subject: [PATCH] Do not emit negative lag because of stale offsets (#14292) The latest topic offsets are polled frequently and used to determine the lag based on the current offsets. However, when the offsets are stale (which can happen due to connection issues commonly), we may see a negative lag . This PR prevents emission of metrics when the offsets are stale and at least one of the partitions has a negative lag. --- .../supervisor/SeekableStreamSupervisor.java | 15 +++++++++++ .../SeekableStreamSupervisorStateTest.java | 25 +++++++++++++++++++ 2 files changed, 40 insertions(+) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java index 0ae9aad9631..d99f84c5746 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java @@ -4220,6 +4220,21 @@ public abstract class SeekableStreamSupervisor 0 && partitionLags.values().stream().anyMatch(x -> x < 0)) { + // Log at most once every twenty supervisor runs to reduce noise in the logs + if ((staleMillis / getIoConfig().getPeriod().getMillis()) % 20 == 0) { + log.warn("Lag is negative and will not be emitted because topic offsets have become stale. " + + "This will not impact data processing. " + + "Offsets may become stale because of connectivity issues."); + } + return; + } + LagStats lagStats = computeLags(partitionLags); Map metricTags = spec.getContextValue(DruidMetrics.TAGS); for (Map.Entry entry : partitionLags.entrySet()) { diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java index b8d5a556eef..a347541a4ef 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java @@ -62,6 +62,7 @@ import org.apache.druid.indexing.seekablestream.common.StreamPartition; import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorStateManager.SeekableStreamExceptionEvent; import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorStateManager.SeekableStreamState; import org.apache.druid.indexing.seekablestream.supervisor.autoscaler.AutoScalerConfig; +import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.granularity.Granularities; @@ -1035,6 +1036,30 @@ public class SeekableStreamSupervisorStateTest extends EasyMockSupport ); } + @Test + public void testStaleOffsetsNegativeLagNotEmitted() throws Exception + { + expectEmitterSupervisor(false); + + CountDownLatch latch = new CountDownLatch(1); + + final TestEmittingTestSeekableStreamSupervisor supervisor = new TestEmittingTestSeekableStreamSupervisor( + latch, + TestEmittingTestSeekableStreamSupervisor.LAG, + // Record lag must not be emitted + ImmutableMap.of("0", 10L, "1", -100L), + null + ); + supervisor.start(); + // Forcibly set the offsets to be stale + supervisor.sequenceLastUpdated = DateTimes.nowUtc().minus(Integer.MAX_VALUE); + + latch.await(); + + supervisor.emitLag(); + Assert.assertEquals(0, emitter.getEvents().size()); + } + private List filterMetrics(List events, List whitelist) { List result = events.stream()