NIFI-534:

- Getting component metrics from Nodes on a set interval using their most recent heartbeat in order to ensure there is a data point for each node for each window.
- Updating the snapshot reduce method to use the timestamp from the first snapshot. This is then the timestamp for the aggregate snapshot.
This commit is contained in:
Matt Gilman 2015-04-23 10:13:05 -04:00
parent a53cc3d707
commit f5479428e5
2 changed files with 42 additions and 17 deletions

View File

@ -398,6 +398,37 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
snapshotMillis = FormatUtils.getTimeDuration(NiFiProperties.DEFAULT_COMPONENT_STATUS_SNAPSHOT_FREQUENCY, TimeUnit.MILLISECONDS);
}
componentStatusSnapshotMillis = snapshotMillis;
Executors.newSingleThreadScheduledExecutor().scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
readLock.lock();
try {
for (final Node node : nodes) {
if (Status.CONNECTED.equals(node.getStatus())) {
ComponentStatusRepository statusRepository = componentMetricsRepositoryMap.get(node.getNodeId());
if (statusRepository == null) {
statusRepository = createComponentStatusRepository();
componentMetricsRepositoryMap.put(node.getNodeId(), statusRepository);
}
// ensure this node has a payload
if (node.getHeartbeat() != null && node.getHeartbeatPayload() != null) {
// if nothing has been captured or the current heartbeat is newer, capture it - comparing the heatbeat created timestamp
// is safe since its marked as XmlTransient so we're assured that its based off the same clock that created the last capture date
if (statusRepository.getLastCaptureDate() == null || node.getHeartbeat().getCreatedTimestamp() > statusRepository.getLastCaptureDate().getTime()) {
statusRepository.capture(node.getHeartbeatPayload().getProcessGroupStatus());
}
}
}
}
} catch(final Throwable t) {
logger.warn("Unable to capture component metrics from Node heartbeats: " + t);
} finally {
readLock.unlock("capture component metrics from node heartbeats");
}
}
}, componentStatusSnapshotMillis, componentStatusSnapshotMillis, TimeUnit.MILLISECONDS);
remoteInputPort = properties.getRemoteInputPort();
if (remoteInputPort == null) {
@ -1807,20 +1838,6 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
// record heartbeat
node.setHeartbeat(mostRecentHeartbeat);
ComponentStatusRepository statusRepository = componentMetricsRepositoryMap.get(node.getNodeId());
if (statusRepository == null) {
statusRepository = createComponentStatusRepository();
componentMetricsRepositoryMap.put(node.getNodeId(), statusRepository);
}
// If it's been a while since we've captured, capture this metric.
final Date lastCaptureDate = statusRepository.getLastCaptureDate();
final long millisSinceLastCapture = (lastCaptureDate == null) ? Long.MAX_VALUE : (System.currentTimeMillis() - lastCaptureDate.getTime());
if (millisSinceLastCapture > componentStatusSnapshotMillis) {
statusRepository.capture(node.getHeartbeatPayload().getProcessGroupStatus());
}
}
} catch (final Exception e) {
logger.error("Failed to process heartbeat from {}:{} due to {}", mostRecentHeartbeat.getNodeIdentifier().getApiAddress(), mostRecentHeartbeat.getNodeIdentifier().getApiPort(), e.toString());
@ -3877,11 +3894,11 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
// Aggregate the snapshots
final List<StatusSnapshotDTO> aggregatedSnapshotDtos = new ArrayList<>();
for (final Map.Entry<Date, List<StatusSnapshot>> entry : snapshotsToAggregate.entrySet()) {
final StatusSnapshotDTO dto = new StatusSnapshotDTO();
dto.setTimestamp(entry.getKey());
final List<StatusSnapshot> snapshots = entry.getValue();
final StatusSnapshot reducedSnapshot = snapshots.get(0).getValueReducer().reduce(snapshots);
final StatusSnapshotDTO dto = new StatusSnapshotDTO();
dto.setTimestamp(reducedSnapshot.getTimestamp());
dto.setStatusMetrics(StatusHistoryUtil.createStatusSnapshotDto(reducedSnapshot).getStatusMetrics());
aggregatedSnapshotDtos.add(dto);

View File

@ -51,12 +51,20 @@ public class StandardStatusSnapshot implements StatusSnapshot {
return new ValueReducer<StatusSnapshot, StatusSnapshot>() {
@Override
public StatusSnapshot reduce(final List<StatusSnapshot> values) {
Date reducedTimestamp = null;
final Set<MetricDescriptor<?>> allDescriptors = new LinkedHashSet<>(metricValues.keySet());
for (final StatusSnapshot statusSnapshot : values) {
if (reducedTimestamp == null) {
reducedTimestamp = statusSnapshot.getTimestamp();
}
allDescriptors.addAll(statusSnapshot.getStatusMetrics().keySet());
}
final StandardStatusSnapshot reduced = new StandardStatusSnapshot();
if (reducedTimestamp != null) {
reduced.setTimestamp(reducedTimestamp);
}
for (final MetricDescriptor<?> descriptor : allDescriptors) {
final Long descriptorValue = descriptor.getValueReducer().reduce(values);