Add segment handoff time metric (#13238)

* Add segment handoff time metric

* Remove monitors on scheduler stop

* Add warning log for slow handoff

* Remove monitor when scheduler stops
This commit is contained in:
AmatyaAvadhanula 2022-11-07 17:49:10 +05:30 committed by GitHub
parent 227b57dd8e
commit 650840ddaf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 164 additions and 222 deletions

View File

@ -52,8 +52,7 @@ public class BasicMonitorScheduler extends MonitorScheduler
exec,
getConfig().getEmitterPeriod(),
() -> {
// Run one more time even if the monitor was removed, in case there's some extra data to flush
if (monitor.monitor(getEmitter()) && hasMonitor(monitor)) {
if (hasMonitor(monitor) && monitor.monitor(getEmitter())) {
return Signal.REPEAT;
} else {
removeMonitor(monitor);

View File

@ -24,8 +24,10 @@ import com.google.errorprone.annotations.concurrent.GuardedBy;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.Set;
@ -35,13 +37,15 @@ import java.util.Set;
*/
public abstract class MonitorScheduler
{
private static final Logger log = new Logger(MonitorScheduler.class);
private final MonitorSchedulerConfig config;
private final ServiceEmitter emitter;
private final Set<Monitor> monitors;
private final Object lock = new Object();
private volatile boolean started = false;
MonitorScheduler(
MonitorSchedulerConfig config,
ServiceEmitter emitter,
@ -86,6 +90,13 @@ public abstract class MonitorScheduler
{
synchronized (lock) {
monitors.remove(monitor);
// Stop the monitor only after emitting the last round of metrics
try {
monitor.monitor(emitter);
}
catch (Throwable t) {
log.warn(t, "Monitor could not emit finally before being removed from scheduler");
}
monitor.stop();
}
}
@ -110,8 +121,9 @@ public abstract class MonitorScheduler
}
started = false;
for (Monitor monitor : monitors) {
monitor.stop();
List<Monitor> monitorsCopy = new ArrayList<>(monitors);
for (Monitor monitor : monitorsCopy) {
removeMonitor(monitor);
}
}
}

View File

@ -82,9 +82,9 @@ public class BasicMonitorSchedulerTest
);
scheduler.start();
Thread.sleep(100);
// monitor.monitor() is called 5 times since a new task is scheduled first and then the current one is executed.
// monitor.monitor() is called at least 5 times since a new task is scheduled first and then the current one is executed.
// See ScheduledExecutors.scheduleAtFixedRate() for details.
Mockito.verify(monitor, Mockito.times(5)).monitor(ArgumentMatchers.any());
Mockito.verify(monitor, Mockito.atLeast(5)).monitor(ArgumentMatchers.any());
scheduler.stop();
}
@ -102,8 +102,8 @@ public class BasicMonitorSchedulerTest
exec
);
scheduler.start();
Thread.sleep(100);
// monitor.monitor() is called 5 times since a new task is scheduled first and then the current one is executed.
Thread.sleep(1000);
// monitor.monitor() is called at least twice
// See ScheduledExecutors.scheduleAtFixedRate() for details.
Mockito.verify(monitor, Mockito.atLeast(2)).monitor(ArgumentMatchers.any());
scheduler.stop();

View File

@ -235,7 +235,7 @@ public class ClockDriftSafeMonitorSchedulerTest
ArgumentMatchers.anyLong(),
ArgumentMatchers.any(), ArgumentMatchers.any(CronTask.class));
Mockito.verify(executor, Mockito.times(1)).submit(ArgumentMatchers.any(Callable.class));
Mockito.verify(monitor, Mockito.times(1)).monitor(ArgumentMatchers.any());
Mockito.verify(monitor, Mockito.times(2)).monitor(ArgumentMatchers.any());
Mockito.verify(monitor, Mockito.times(1)).stop();
scheduler.stop();
}

View File

@ -0,0 +1,81 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.java.util.metrics;
import com.google.common.collect.ImmutableList;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.easymock.EasyMock;
import org.joda.time.Duration;
import org.junit.Test;
import java.io.IOException;
public class MonitorSchedulerTest
{
@Test
public void testMonitorAndStopOnRemove() throws IOException
{
MonitorSchedulerConfig infiniteFlushDelayConfig = new MonitorSchedulerConfig()
{
@Override
public Duration getEmitterPeriod()
{
return Duration.millis(Long.MAX_VALUE);
}
};
ServiceEmitter emitter = EasyMock.mock(ServiceEmitter.class);
Monitor monitor = new AbstractMonitor()
{
@Override
public boolean doMonitor(ServiceEmitter emitter)
{
try {
emitter.flush();
return true;
}
catch (Throwable t) {
return false;
}
}
};
MonitorScheduler scheduler = new BasicMonitorScheduler(
infiniteFlushDelayConfig,
emitter,
ImmutableList.of(monitor),
Execs.scheduledSingleThreaded("MonitorScheduler-%s")
);
scheduler.start();
// Expect an emitter flush, despite infinite scheduler duration, when monitor is removed
emitter.flush();
EasyMock.expectLastCall().times(1);
EasyMock.replay(emitter);
scheduler.removeMonitor(monitor);
EasyMock.verify(emitter);
}
}

View File

@ -50,8 +50,6 @@ public class TaskRealtimeMetricsMonitor extends AbstractMonitor
private FireDepartmentMetrics previousFireDepartmentMetrics;
private RowIngestionMetersTotals previousRowIngestionMetersTotals;
private volatile boolean lastRoundMetricsToBePushed = false;
public TaskRealtimeMetricsMonitor(
FireDepartment fireDepartment,
RowIngestionMeters rowIngestionMeters,
@ -65,27 +63,6 @@ public class TaskRealtimeMetricsMonitor extends AbstractMonitor
previousRowIngestionMetersTotals = new RowIngestionMetersTotals(0, 0, 0, 0);
}
@Override
public void start()
{
super.start();
lastRoundMetricsToBePushed = true;
}
@Override
public boolean monitor(ServiceEmitter emitter)
{
if (isStarted()) {
return doMonitor(emitter);
} else if (lastRoundMetricsToBePushed) {
// Run one more time even if the monitor was removed, in case there's some extra data to flush
lastRoundMetricsToBePushed = false;
return doMonitor(emitter);
}
return false;
}
@Override
public boolean doMonitor(ServiceEmitter emitter)
{
@ -144,6 +121,11 @@ public class TaskRealtimeMetricsMonitor extends AbstractMonitor
emitter.emit(builder.build("ingest/sink/count", metrics.sinkCount()));
emitter.emit(builder.build("ingest/events/messageGap", metrics.messageGap()));
long maxSegmentHandoffTime = metrics.maxSegmentHandoffTime();
if (maxSegmentHandoffTime >= 0) {
emitter.emit(builder.build("ingest/handoff/time", maxSegmentHandoffTime));
}
previousRowIngestionMetersTotals = rowIngestionMetersTotals;
previousFireDepartmentMetrics = metrics;
return true;

View File

@ -1,83 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexing.common.stats;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.segment.incremental.NoopRowIngestionMeters;
import org.apache.druid.segment.incremental.RowIngestionMeters;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.realtime.FireDepartment;
import org.apache.druid.segment.realtime.FireDepartmentMetrics;
import org.easymock.EasyMock;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
public class TaskRealtimeMetricsMonitorTest
{
private ServiceEmitter emitter;
private FireDepartment fireDepartment;
@Rule
public ExpectedException expectedException = ExpectedException.none();
@Before
public void setup()
{
emitter = EasyMock.mock(ServiceEmitter.class);
fireDepartment = EasyMock.mock(FireDepartment.class);
}
@Test
public void testLastRoundMetricsEmission()
{
FireDepartmentMetrics metrics = new FireDepartmentMetrics();
RowIngestionMeters rowIngestionMeters = new NoopRowIngestionMeters();
DataSchema schema = new DataSchema("dataSource", null, null, null, null, null, null, null);
EasyMock.expect(fireDepartment.getMetrics()).andReturn(metrics);
EasyMock.expectLastCall().times(2);
EasyMock.expect(fireDepartment.getDataSchema()).andReturn(schema);
EasyMock.expectLastCall().times(2);
EasyMock.replay(fireDepartment);
TaskRealtimeMetricsMonitor monitor = new TaskRealtimeMetricsMonitor(fireDepartment, rowIngestionMeters, ImmutableMap.of());
Assert.assertFalse(monitor.isStarted());
boolean zerothRound = monitor.monitor(emitter);
monitor.start();
Assert.assertTrue(monitor.isStarted());
boolean firstRound = monitor.monitor(emitter);
monitor.stop();
Assert.assertFalse(monitor.isStarted());
boolean secondRound = monitor.monitor(emitter);
boolean thirdRound = monitor.monitor(emitter);
Assert.assertFalse(zerothRound);
Assert.assertTrue(firstRound && secondRound);
Assert.assertFalse(thirdRound);
EasyMock.verify(fireDepartment);
}
}

View File

@ -1160,9 +1160,9 @@ public class TaskLifecycleTest extends InitializedNullHandlingTest
{
publishCountDown = new CountDownLatch(1);
monitorScheduler.addMonitor(EasyMock.anyObject(Monitor.class));
EasyMock.expectLastCall().atLeastOnce();
EasyMock.expectLastCall().times(1);
monitorScheduler.removeMonitor(EasyMock.anyObject(Monitor.class));
EasyMock.expectLastCall().anyTimes();
EasyMock.expectLastCall().times(1);
EasyMock.replay(monitorScheduler, queryRunnerFactoryConglomerate);
RealtimeIndexTask realtimeIndexTask = newRealtimeIndexTask();
@ -1240,9 +1240,9 @@ public class TaskLifecycleTest extends InitializedNullHandlingTest
taskQueue = setUpTaskQueue(taskStorage, taskRunner);
monitorScheduler.addMonitor(EasyMock.anyObject(Monitor.class));
EasyMock.expectLastCall().atLeastOnce();
EasyMock.expectLastCall().times(1);
monitorScheduler.removeMonitor(EasyMock.anyObject(Monitor.class));
EasyMock.expectLastCall().anyTimes();
EasyMock.expectLastCall().times(1);
EasyMock.replay(monitorScheduler, queryRunnerFactoryConglomerate);
RealtimeIndexTask realtimeIndexTask = newRealtimeIndexTask();

View File

@ -29,6 +29,8 @@ public class FireDepartmentMetrics
{
private static final long DEFAULT_PROCESSING_COMPLETION_TIME = -1L;
private static final long DEFAULT_SEGMENT_HANDOFF_TIME = -1L;
private final AtomicLong processedCount = new AtomicLong(0);
private final AtomicLong processedWithErrorsCount = new AtomicLong(0);
private final AtomicLong thrownAwayCount = new AtomicLong(0);
@ -49,6 +51,8 @@ public class FireDepartmentMetrics
private final AtomicLong messageGap = new AtomicLong(0);
private final AtomicLong messageProcessingCompletionTime = new AtomicLong(DEFAULT_PROCESSING_COMPLETION_TIME);
private final AtomicLong maxSegmentHandoffTime = new AtomicLong(DEFAULT_SEGMENT_HANDOFF_TIME);
public void incrementProcessed()
{
processedCount.incrementAndGet();
@ -134,6 +138,11 @@ public class FireDepartmentMetrics
this.messageMaxTimestamp.set(Math.max(messageMaxTimestamp, this.messageMaxTimestamp.get()));
}
public void reportMaxSegmentHandoffTime(long maxSegmentHandoffTime)
{
this.maxSegmentHandoffTime.set(Math.max(maxSegmentHandoffTime, this.maxSegmentHandoffTime.get()));
}
public void markProcessingDone()
{
markProcessingDone(System.currentTimeMillis());
@ -241,6 +250,11 @@ public class FireDepartmentMetrics
return messageGap.get();
}
public long maxSegmentHandoffTime()
{
return maxSegmentHandoffTime.get();
}
public FireDepartmentMetrics snapshot()
{
final FireDepartmentMetrics retVal = new FireDepartmentMetrics();
@ -261,10 +275,19 @@ public class FireDepartmentMetrics
retVal.handOffCount.set(handOffCount.get());
retVal.sinkCount.set(sinkCount.get());
retVal.messageMaxTimestamp.set(messageMaxTimestamp.get());
retVal.maxSegmentHandoffTime.set(maxSegmentHandoffTime.get());
retVal.messageProcessingCompletionTime.set(messageProcessingCompletionTime.get());
retVal.messageProcessingCompletionTime.compareAndSet(DEFAULT_PROCESSING_COMPLETION_TIME, System.currentTimeMillis());
long maxTimestamp = retVal.messageMaxTimestamp.get();
retVal.messageGap.set(maxTimestamp > 0 ? retVal.messageProcessingCompletionTime.get() - maxTimestamp : 0L);
reset();
return retVal;
}
private void reset()
{
maxSegmentHandoffTime.set(DEFAULT_SEGMENT_HANDOFF_TIME);
}
}

View File

@ -46,8 +46,6 @@ public class RealtimeMetricsMonitor extends AbstractMonitor
private final List<FireDepartment> fireDepartments;
private final Map<String, String[]> dimensions;
private volatile boolean lastRoundMetricsToBePushed = false;
@Inject
public RealtimeMetricsMonitor(List<FireDepartment> fireDepartments)
{
@ -61,27 +59,6 @@ public class RealtimeMetricsMonitor extends AbstractMonitor
this.dimensions = ImmutableMap.copyOf(dimensions);
}
@Override
public void start()
{
super.start();
lastRoundMetricsToBePushed = true;
}
@Override
public boolean monitor(ServiceEmitter emitter)
{
if (isStarted()) {
return doMonitor(emitter);
} else if (lastRoundMetricsToBePushed) {
// Run one more time even if the monitor was removed, in case there's some extra data to flush
lastRoundMetricsToBePushed = false;
return doMonitor(emitter);
}
return false;
}
@Override
public boolean doMonitor(ServiceEmitter emitter)
{
@ -137,6 +114,12 @@ public class RealtimeMetricsMonitor extends AbstractMonitor
emitter.emit(builder.build("ingest/handoff/count", metrics.handOffCount() - previous.handOffCount()));
emitter.emit(builder.build("ingest/sink/count", metrics.sinkCount()));
emitter.emit(builder.build("ingest/events/messageGap", metrics.messageGap()));
long maxSegmentHandoffTime = metrics.maxSegmentHandoffTime();
if (maxSegmentHandoffTime >= 0) {
emitter.emit(builder.build("ingest/handoff/time", maxSegmentHandoffTime));
}
previousValues.put(fireDepartment, metrics);
}

View File

@ -74,6 +74,8 @@ public class StreamAppenderatorDriver extends BaseAppenderatorDriver
{
private static final Logger log = new Logger(StreamAppenderatorDriver.class);
private static final long HANDOFF_TIME_THRESHOLD = 600_000;
private final SegmentHandoffNotifier handoffNotifier;
private final FireDepartmentMetrics metrics;
private final ObjectMapper objectMapper;
@ -332,6 +334,7 @@ public class StreamAppenderatorDriver extends BaseAppenderatorDriver
}
log.debug("Register handoff of segments: [%s]", waitingSegmentIdList);
final long handoffStartTime = System.currentTimeMillis();
final SettableFuture<SegmentsAndCommitMetadata> resultFuture = SettableFuture.create();
final AtomicInteger numRemainingHandoffSegments = new AtomicInteger(waitingSegmentIdList.size());
@ -358,7 +361,13 @@ public class StreamAppenderatorDriver extends BaseAppenderatorDriver
{
if (numRemainingHandoffSegments.decrementAndGet() == 0) {
List<DataSegment> segments = segmentsAndCommitMetadata.getSegments();
log.debug("Successfully handed off [%d] segments.", segments.size());
log.info("Successfully handed off [%d] segments.", segments.size());
final long handoffTotalTime = System.currentTimeMillis() - handoffStartTime;
metrics.reportMaxSegmentHandoffTime(handoffTotalTime);
if (handoffTotalTime > HANDOFF_TIME_THRESHOLD) {
log.warn("Slow segment handoff! Time taken for [%d] segments is %d ms",
segments.size(), handoffTotalTime);
}
resultFuture.set(
new SegmentsAndCommitMetadata(
segments,

View File

@ -36,15 +36,21 @@ public class FireDepartmentMetricsTest
@Test
public void testSnapshotBeforeProcessing()
{
Assert.assertEquals(0L, metrics.snapshot().messageGap());
FireDepartmentMetrics snapshot = metrics.snapshot();
Assert.assertEquals(0L, snapshot.messageGap());
// invalid value
Assert.assertTrue(0 > snapshot.maxSegmentHandoffTime());
}
@Test
public void testSnapshotAfterProcessingOver()
{
metrics.reportMessageMaxTimestamp(10);
metrics.reportMaxSegmentHandoffTime(7L);
metrics.markProcessingDone(30L);
Assert.assertEquals(20, metrics.snapshot().messageGap());
FireDepartmentMetrics snapshot = metrics.snapshot();
Assert.assertEquals(20, snapshot.messageGap());
Assert.assertEquals(7, snapshot.maxSegmentHandoffTime());
}
@Test
@ -60,9 +66,14 @@ public class FireDepartmentMetricsTest
public void testProcessingOverAfterSnapshot()
{
metrics.reportMessageMaxTimestamp(10);
metrics.reportMaxSegmentHandoffTime(7L);
// Should reset to invalid value
metrics.snapshot();
metrics.markProcessingDone(20);
Assert.assertEquals(10, metrics.snapshot().messageGap());
FireDepartmentMetrics snapshot = metrics.snapshot();
Assert.assertEquals(10, snapshot.messageGap());
// value must be invalid
Assert.assertTrue(0 > snapshot.maxSegmentHandoffTime());
}
@Test

View File

@ -1,75 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.segment.realtime;
import com.google.common.collect.ImmutableList;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.segment.indexing.DataSchema;
import org.easymock.EasyMock;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
public class RealtimeMetricsMonitorTest
{
private ServiceEmitter emitter;
private FireDepartment fireDepartment;
@Rule
public ExpectedException expectedException = ExpectedException.none();
@Before
public void setup()
{
emitter = EasyMock.mock(ServiceEmitter.class);
fireDepartment = EasyMock.mock(FireDepartment.class);
}
@Test
public void testLastRoundMetricsEmission()
{
FireDepartmentMetrics metrics = new FireDepartmentMetrics();
DataSchema schema = new DataSchema("dataSource", null, null, null, null, null, null, null);
EasyMock.expect(fireDepartment.getMetrics()).andReturn(metrics);
EasyMock.expectLastCall().times(2);
EasyMock.expect(fireDepartment.getDataSchema()).andReturn(schema);
EasyMock.expectLastCall().times(2);
EasyMock.replay(fireDepartment);
RealtimeMetricsMonitor monitor = new RealtimeMetricsMonitor(ImmutableList.of(fireDepartment));
Assert.assertFalse(monitor.isStarted());
boolean zerothRound = monitor.monitor(emitter);
monitor.start();
Assert.assertTrue(monitor.isStarted());
boolean firstRound = monitor.monitor(emitter);
monitor.stop();
Assert.assertFalse(monitor.isStarted());
boolean secondRound = monitor.monitor(emitter);
boolean thirdRound = monitor.monitor(emitter);
Assert.assertFalse(zerothRound);
Assert.assertTrue(firstRound && secondRound);
Assert.assertFalse(thirdRound);
EasyMock.verify(fireDepartment);
}
}