From f2495a67d206534b6ee79b083ce49b3131165192 Mon Sep 17 00:00:00 2001 From: Yuanli Han <44718283+yuanlihan@users.noreply.github.com> Date: Tue, 29 Mar 2022 00:21:06 +0800 Subject: [PATCH] fix messageGap metric (#12337) --- .../druid/segment/realtime/FireDepartmentMetrics.java | 3 ++- .../druid/segment/realtime/FireDepartmentMetricsTest.java | 6 ++++++ 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/apache/druid/segment/realtime/FireDepartmentMetrics.java b/server/src/main/java/org/apache/druid/segment/realtime/FireDepartmentMetrics.java index 9fd286428d0..0811a3434f7 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/FireDepartmentMetrics.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/FireDepartmentMetrics.java @@ -263,7 +263,8 @@ public class FireDepartmentMetrics retVal.messageMaxTimestamp.set(messageMaxTimestamp.get()); retVal.messageProcessingCompletionTime.set(messageProcessingCompletionTime.get()); retVal.messageProcessingCompletionTime.compareAndSet(DEFAULT_PROCESSING_COMPLETION_TIME, System.currentTimeMillis()); - retVal.messageGap.set(retVal.messageProcessingCompletionTime.get() - messageMaxTimestamp.get()); + long maxTimestamp = retVal.messageMaxTimestamp.get(); + retVal.messageGap.set(maxTimestamp > 0 ? retVal.messageProcessingCompletionTime.get() - maxTimestamp : 0L); return retVal; } } diff --git a/server/src/test/java/org/apache/druid/segment/realtime/FireDepartmentMetricsTest.java b/server/src/test/java/org/apache/druid/segment/realtime/FireDepartmentMetricsTest.java index 0c1025161b1..17d4356acd3 100644 --- a/server/src/test/java/org/apache/druid/segment/realtime/FireDepartmentMetricsTest.java +++ b/server/src/test/java/org/apache/druid/segment/realtime/FireDepartmentMetricsTest.java @@ -33,6 +33,12 @@ public class FireDepartmentMetricsTest metrics = new FireDepartmentMetrics(); } + @Test + public void testSnapshotBeforeProcessing() + { + Assert.assertEquals(0L, metrics.snapshot().messageGap()); + } + @Test public void testSnapshotAfterProcessingOver() {