mirror of https://github.com/apache/druid.git
report message gap, source gap and sink count in RealtimePlumber (#3744)
* report message gap, source gap and sink count in RealtimePlumber * report message gap, sink count in Appenderator * add ingest/events/sourceGap in metrics.md * remove source gap
This commit is contained in:
parent
07384d6f40
commit
4be3eb0ce7
|
@ -109,6 +109,8 @@ These metrics are only available if the RealtimeMetricsMonitor is included in th
|
||||||
|`ingest/merge/time`|Milliseconds spent merging intermediate segments|dataSource.|Depends on configuration. Generally a few minutes at most.|
|
|`ingest/merge/time`|Milliseconds spent merging intermediate segments|dataSource.|Depends on configuration. Generally a few minutes at most.|
|
||||||
|`ingest/merge/cpu`|Cpu time in Nanoseconds spent on merging intermediate segments.|dataSource.|Depends on configuration. Generally a few minutes at most.|
|
|`ingest/merge/cpu`|Cpu time in Nanoseconds spent on merging intermediate segments.|dataSource.|Depends on configuration. Generally a few minutes at most.|
|
||||||
|`ingest/handoff/count`|Number of handoffs that happened.|dataSource.|Varies. Generally greater than 0 once every segment granular period if cluster operating normally|
|
|`ingest/handoff/count`|Number of handoffs that happened.|dataSource.|Varies. Generally greater than 0 once every segment granular period if cluster operating normally|
|
||||||
|
|`ingest/sink/count`|Number of sinks not handoffed.|dataSource.|1~3|
|
||||||
|
|`ingest/events/messageGap`|Time gap between the data time in event and current system time.|dataSource.|Greater than 0, depends on the time carried in event |
|
||||||
|
|
||||||
Note: If the JVM does not support CPU time measurement for the current thread, ingest/merge/cpu and ingest/persists/cpu will be 0.
|
Note: If the JVM does not support CPU time measurement for the current thread, ingest/merge/cpu and ingest/persists/cpu will be 0.
|
||||||
|
|
||||||
|
|
|
@ -40,6 +40,9 @@ public class FireDepartmentMetrics
|
||||||
private final AtomicLong mergeCpuTime = new AtomicLong(0);
|
private final AtomicLong mergeCpuTime = new AtomicLong(0);
|
||||||
private final AtomicLong persistCpuTime = new AtomicLong(0);
|
private final AtomicLong persistCpuTime = new AtomicLong(0);
|
||||||
private final AtomicLong handOffCount = new AtomicLong(0);
|
private final AtomicLong handOffCount = new AtomicLong(0);
|
||||||
|
private final AtomicLong sinkCount = new AtomicLong(0);
|
||||||
|
private final AtomicLong messageMaxTimestamp = new AtomicLong(0);
|
||||||
|
private final AtomicLong messageGap = new AtomicLong(0);
|
||||||
|
|
||||||
public void incrementProcessed()
|
public void incrementProcessed()
|
||||||
{
|
{
|
||||||
|
@ -103,6 +106,14 @@ public class FireDepartmentMetrics
|
||||||
handOffCount.incrementAndGet();
|
handOffCount.incrementAndGet();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void setSinkCount(long sinkCount){
|
||||||
|
this.sinkCount.set(sinkCount);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void reportMessageMaxTimestamp(long messageMaxTimestamp){
|
||||||
|
this.messageMaxTimestamp.set(Math.max(messageMaxTimestamp, this.messageMaxTimestamp.get()));
|
||||||
|
}
|
||||||
|
|
||||||
public long processed()
|
public long processed()
|
||||||
{
|
{
|
||||||
return processedCount.get();
|
return processedCount.get();
|
||||||
|
@ -168,6 +179,20 @@ public class FireDepartmentMetrics
|
||||||
return handOffCount.get();
|
return handOffCount.get();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public long sinkCount()
|
||||||
|
{
|
||||||
|
return sinkCount.get();
|
||||||
|
}
|
||||||
|
|
||||||
|
public long messageMaxTimestamp()
|
||||||
|
{
|
||||||
|
return messageMaxTimestamp.get();
|
||||||
|
}
|
||||||
|
|
||||||
|
public long messageGap()
|
||||||
|
{
|
||||||
|
return messageGap.get();
|
||||||
|
}
|
||||||
|
|
||||||
public FireDepartmentMetrics snapshot()
|
public FireDepartmentMetrics snapshot()
|
||||||
{
|
{
|
||||||
|
@ -185,6 +210,9 @@ public class FireDepartmentMetrics
|
||||||
retVal.mergeCpuTime.set(mergeCpuTime.get());
|
retVal.mergeCpuTime.set(mergeCpuTime.get());
|
||||||
retVal.persistCpuTime.set(persistCpuTime.get());
|
retVal.persistCpuTime.set(persistCpuTime.get());
|
||||||
retVal.handOffCount.set(handOffCount.get());
|
retVal.handOffCount.set(handOffCount.get());
|
||||||
|
retVal.sinkCount.set(sinkCount.get());
|
||||||
|
retVal.messageMaxTimestamp.set(messageMaxTimestamp.get());
|
||||||
|
retVal.messageGap.set(System.currentTimeMillis() - messageMaxTimestamp.get());
|
||||||
return retVal;
|
return retVal;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -210,6 +238,9 @@ public class FireDepartmentMetrics
|
||||||
mergeCpuTime.addAndGet(otherSnapshot.mergeCpuTime());
|
mergeCpuTime.addAndGet(otherSnapshot.mergeCpuTime());
|
||||||
persistCpuTime.addAndGet(otherSnapshot.persistCpuTime());
|
persistCpuTime.addAndGet(otherSnapshot.persistCpuTime());
|
||||||
handOffCount.addAndGet(otherSnapshot.handOffCount());
|
handOffCount.addAndGet(otherSnapshot.handOffCount());
|
||||||
|
sinkCount.addAndGet(otherSnapshot.sinkCount());
|
||||||
|
messageMaxTimestamp.set(Math.max(messageMaxTimestamp(), otherSnapshot.messageMaxTimestamp()));
|
||||||
|
messageGap.set(Math.max(messageGap(), otherSnapshot.messageGap()));
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -96,6 +96,8 @@ public class RealtimeMetricsMonitor extends AbstractMonitor
|
||||||
emitter.emit(builder.build("ingest/merge/time", metrics.mergeTimeMillis() - previous.mergeTimeMillis()));
|
emitter.emit(builder.build("ingest/merge/time", metrics.mergeTimeMillis() - previous.mergeTimeMillis()));
|
||||||
emitter.emit(builder.build("ingest/merge/cpu", metrics.mergeCpuTime() - previous.mergeCpuTime()));
|
emitter.emit(builder.build("ingest/merge/cpu", metrics.mergeCpuTime() - previous.mergeCpuTime()));
|
||||||
emitter.emit(builder.build("ingest/handoff/count", metrics.handOffCount() - previous.handOffCount()));
|
emitter.emit(builder.build("ingest/handoff/count", metrics.handOffCount() - previous.handOffCount()));
|
||||||
|
emitter.emit(builder.build("ingest/sink/count", metrics.sinkCount()));
|
||||||
|
emitter.emit(builder.build("ingest/events/messageGap", metrics.messageGap()));
|
||||||
previousValues.put(fireDepartment, metrics);
|
previousValues.put(fireDepartment, metrics);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -191,6 +191,7 @@ public class AppenderatorImpl implements Appenderator
|
||||||
}
|
}
|
||||||
|
|
||||||
final Sink sink = getOrCreateSink(identifier);
|
final Sink sink = getOrCreateSink(identifier);
|
||||||
|
metrics.reportMessageMaxTimestamp(row.getTimestampFromEpoch());
|
||||||
final int sinkRowsInMemoryBeforeAdd = sink.getNumRowsInMemory();
|
final int sinkRowsInMemoryBeforeAdd = sink.getNumRowsInMemory();
|
||||||
final int sinkRowsInMemoryAfterAdd;
|
final int sinkRowsInMemoryAfterAdd;
|
||||||
|
|
||||||
|
@ -269,6 +270,7 @@ public class AppenderatorImpl implements Appenderator
|
||||||
}
|
}
|
||||||
|
|
||||||
sinks.put(identifier, retVal);
|
sinks.put(identifier, retVal);
|
||||||
|
metrics.setSinkCount(sinks.size());
|
||||||
sinkTimeline.add(retVal.getInterval(), retVal.getVersion(), identifier.getShardSpec().createChunk(retVal));
|
sinkTimeline.add(retVal.getInterval(), retVal.getVersion(), identifier.getShardSpec().createChunk(retVal));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -905,6 +907,7 @@ public class AppenderatorImpl implements Appenderator
|
||||||
|
|
||||||
log.info("Removing sink for segment[%s].", identifier);
|
log.info("Removing sink for segment[%s].", identifier);
|
||||||
sinks.remove(identifier);
|
sinks.remove(identifier);
|
||||||
|
metrics.setSinkCount(sinks.size());
|
||||||
droppingSinks.remove(identifier);
|
droppingSinks.remove(identifier);
|
||||||
sinkTimeline.remove(
|
sinkTimeline.remove(
|
||||||
sink.getInterval(),
|
sink.getInterval(),
|
||||||
|
|
|
@ -204,7 +204,9 @@ public class RealtimePlumber implements Plumber
|
||||||
@Override
|
@Override
|
||||||
public int add(InputRow row, Supplier<Committer> committerSupplier) throws IndexSizeExceededException
|
public int add(InputRow row, Supplier<Committer> committerSupplier) throws IndexSizeExceededException
|
||||||
{
|
{
|
||||||
final Sink sink = getSink(row.getTimestampFromEpoch());
|
long messageTimestamp = row.getTimestampFromEpoch();
|
||||||
|
final Sink sink = getSink(messageTimestamp);
|
||||||
|
metrics.reportMessageMaxTimestamp(messageTimestamp);
|
||||||
if (sink == null) {
|
if (sink == null) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
@ -716,6 +718,7 @@ public class RealtimePlumber implements Plumber
|
||||||
private void addSink(final Sink sink)
|
private void addSink(final Sink sink)
|
||||||
{
|
{
|
||||||
sinks.put(sink.getInterval().getStartMillis(), sink);
|
sinks.put(sink.getInterval().getStartMillis(), sink);
|
||||||
|
metrics.setSinkCount(sinks.size());
|
||||||
sinkTimeline.add(
|
sinkTimeline.add(
|
||||||
sink.getInterval(),
|
sink.getInterval(),
|
||||||
sink.getVersion(),
|
sink.getVersion(),
|
||||||
|
@ -850,6 +853,7 @@ public class RealtimePlumber implements Plumber
|
||||||
removeSegment(sink, computePersistDir(schema, sink.getInterval()));
|
removeSegment(sink, computePersistDir(schema, sink.getInterval()));
|
||||||
log.info("Removing sinkKey %d for segment %s", truncatedTime, sink.getSegment().getIdentifier());
|
log.info("Removing sinkKey %d for segment %s", truncatedTime, sink.getSegment().getIdentifier());
|
||||||
sinks.remove(truncatedTime);
|
sinks.remove(truncatedTime);
|
||||||
|
metrics.setSinkCount(sinks.size());
|
||||||
sinkTimeline.remove(
|
sinkTimeline.remove(
|
||||||
sink.getInterval(),
|
sink.getInterval(),
|
||||||
sink.getVersion(),
|
sink.getVersion(),
|
||||||
|
|
Loading…
Reference in New Issue