mirror of
https://github.com/apache/nifi.git
synced 2025-02-06 01:58:32 +00:00
NIFI-6649 - added separate query interval configuration for observation queries
NIFI-6649 - documentation update NIFI-6649 - add debug logging for score and prediction information NIFI-6649 - fix to ensure counts return minimum value of 0 if not infinite or NaN Signed-off-by: Matthew Burgess <mattyb149@apache.org> This closes #3719
This commit is contained in:
parent
c187292fd9
commit
8e1452a3f3
@ -241,6 +241,7 @@ public abstract class NiFiProperties {
|
||||
// analytics properties
|
||||
public static final String ANALYTICS_PREDICTION_ENABLED = "nifi.analytics.predict.enabled";
|
||||
public static final String ANALYTICS_PREDICTION_INTERVAL = "nifi.analytics.predict.interval";
|
||||
public static final String ANALYTICS_QUERY_INTERVAL = "nifi.analytics.query.interval";
|
||||
public static final String ANALYTICS_CONNECTION_MODEL_IMPLEMENTATION = "nifi.analytics.connection.model.implementation";
|
||||
public static final String ANALYTICS_CONNECTION_MODEL_SCORE_NAME= "nifi.analytics.connection.model.score.name";
|
||||
public static final String ANALYTICS_CONNECTION_MODEL_SCORE_THRESHOLD = "nifi.analytics.connection.model.score.threshold";
|
||||
@ -318,6 +319,7 @@ public abstract class NiFiProperties {
|
||||
// analytics defaults
|
||||
public static final String DEFAULT_ANALYTICS_PREDICTION_ENABLED = "false";
|
||||
public static final String DEFAULT_ANALYTICS_PREDICTION_INTERVAL = "3 mins";
|
||||
public static final String DEFAULT_ANALYTICS_QUERY_INTERVAL = "3 mins";
|
||||
public final static String DEFAULT_ANALYTICS_CONNECTION_MODEL_IMPLEMENTATION = "org.apache.nifi.controller.status.analytics.models.OrdinaryLeastSquares";
|
||||
public static final String DEFAULT_ANALYTICS_CONNECTION_SCORE_NAME = "rSquared";
|
||||
public static final double DEFAULT_ANALYTICS_CONNECTION_SCORE_THRESHOLD = .90;
|
||||
|
@ -2389,7 +2389,7 @@ In order to generate predictions, local status snapshot history is queried to ob
|
||||
|
||||
NiFi evaluates the model's effectiveness before sending prediction information by using the model's R-Squared score by default. One important note: R-Square is a measure of how close the regression line fits the observation data vs. how accurate the prediction will be; therefore there may be some measure of error. If the R-Squared score for the calculated model meets the configured threshold (as defined by `nifi.analytics.connection.model.score.threshold`) then the model will be used for prediction. Otherwise the model will not be used and predictions will not be available until a model is generated with a score that exceeds the threshold. Default R-Squared threshold value is `.9` however this can be tuned based on prediction requirements.
|
||||
|
||||
The prediction interval `nifi.analytics.predict.interval` can be configured to project out further when back pressure will occur. Predictions further out in time require more observations stored locally to generate an effective model. This may also require tuning of the model's scoring threshold value to select a score which can offer reasonable predictions.
|
||||
The prediction interval `nifi.analytics.predict.interval` can be configured to project out further when back pressure will occur. The prediction query interval `nifi.analytics.query.interval` can also be configured to determine how far back in time past observations should be queried in order to generate the model. Adjustments to these settings may require tuning of the model's scoring threshold value to select a score that can offer reasonable predictions.
|
||||
|
||||
See <<analytics_properties>> for complete information on configuring analytic properties.
|
||||
|
||||
@ -3341,6 +3341,7 @@ These properties determine the behavior of the internal NiFi predictive analytic
|
||||
|*Property*|*Description*
|
||||
|`nifi.analytics.predict.enabled`|This indicates whether prediction should be enabled for the cluster. The default is `false`.
|
||||
|`nifi.analytics.predict.interval`|This indicates a time interval for which analytical predictions (queue saturation, e.g.) should be made. The default value is `3 mins`.
|
||||
|`nifi.analytics.query.interval`|This indicates a time interval to query for past observations (e.g. the last 3 minutes of snapshots). The default value is `3 mins`. NOTE: This value should be at least 3 times greater than `nifi.components.status.snapshot.frequency` to ensure enough observations are retrieved for predictions.
|
||||
|`nifi.analytics.connection.model.implementation`|This is the implementation class for the status analytics model used to make connection predictions. The default value is `org.apache.nifi.controller.status.analytics.models.OrdinaryLeastSquares`.
|
||||
|`nifi.analytics.connection.model.score.name`|This is the name of the scoring type that should be used to evaluate model. The default value is `rSquared`.
|
||||
|`nifi.analytics.connection.model.score.threshold`|This is the threshold for the scoring value (where model score should be above given threshold). The default value is `.9`.
|
||||
|
@ -614,6 +614,17 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node
|
||||
predictionIntervalMillis = FormatUtils.getTimeDuration(NiFiProperties.DEFAULT_ANALYTICS_PREDICTION_INTERVAL, TimeUnit.MILLISECONDS);
|
||||
}
|
||||
|
||||
// Determine interval for querying past observations
|
||||
final String queryInterval = nifiProperties.getProperty(NiFiProperties.ANALYTICS_QUERY_INTERVAL, NiFiProperties.DEFAULT_ANALYTICS_QUERY_INTERVAL);
|
||||
long queryIntervalMillis;
|
||||
try {
|
||||
queryIntervalMillis = FormatUtils.getTimeDuration(queryInterval, TimeUnit.MILLISECONDS);
|
||||
} catch (final Exception e) {
|
||||
LOG.warn("Analytics is enabled however could not retrieve value for "+ NiFiProperties.ANALYTICS_QUERY_INTERVAL + ". This property has been set to '"
|
||||
+ NiFiProperties.DEFAULT_ANALYTICS_QUERY_INTERVAL + "'");
|
||||
queryIntervalMillis = FormatUtils.getTimeDuration(NiFiProperties.DEFAULT_ANALYTICS_QUERY_INTERVAL, TimeUnit.MILLISECONDS);
|
||||
}
|
||||
|
||||
// Determine score name to use for evaluating model performance
|
||||
String modelScoreName = nifiProperties.getProperty(NiFiProperties.ANALYTICS_CONNECTION_MODEL_SCORE_NAME, NiFiProperties.DEFAULT_ANALYTICS_CONNECTION_SCORE_NAME);
|
||||
|
||||
@ -632,7 +643,7 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node
|
||||
.getConnectionStatusModelMap(extensionManager, nifiProperties);
|
||||
|
||||
analyticsEngine = new CachingConnectionStatusAnalyticsEngine(flowManager, componentStatusRepository, flowFileEventRepository, modelMap,
|
||||
predictionIntervalMillis, modelScoreName, modelScoreThreshold);
|
||||
predictionIntervalMillis, queryIntervalMillis, modelScoreName, modelScoreThreshold);
|
||||
}
|
||||
|
||||
eventAccess = new StandardEventAccess(this, flowFileEventRepository);
|
||||
|
@ -40,9 +40,10 @@ public class CachingConnectionStatusAnalyticsEngine extends ConnectionStatusAnal
|
||||
|
||||
public CachingConnectionStatusAnalyticsEngine(FlowManager flowManager, ComponentStatusRepository statusRepository,
|
||||
FlowFileEventRepository flowFileEventRepository, Map<String, Tuple<StatusAnalyticsModel, StatusMetricExtractFunction>> modelMap,
|
||||
long predictionIntervalMillis, String scoreName, double scoreThreshold) {
|
||||
long predictionIntervalMillis, long queryIntervalMillis, String scoreName, double scoreThreshold) {
|
||||
|
||||
super(flowManager,statusRepository,flowFileEventRepository,modelMap,predictionIntervalMillis,scoreName,scoreThreshold);
|
||||
super(flowManager, statusRepository, flowFileEventRepository, modelMap, predictionIntervalMillis,
|
||||
queryIntervalMillis, scoreName, scoreThreshold);
|
||||
this.cache = Caffeine.newBuilder()
|
||||
.expireAfterWrite(30, TimeUnit.MINUTES)
|
||||
.build();
|
||||
|
@ -24,6 +24,7 @@ import java.util.NoSuchElementException;
|
||||
import java.util.Optional;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import org.apache.commons.collections4.MapUtils;
|
||||
import org.apache.commons.lang3.ArrayUtils;
|
||||
import org.apache.nifi.connectable.Connection;
|
||||
import org.apache.nifi.controller.flow.FlowManager;
|
||||
@ -56,6 +57,7 @@ public class ConnectionStatusAnalytics implements StatusAnalytics {
|
||||
private final Boolean supportOnlineLearning;
|
||||
private Boolean extendWindow = false;
|
||||
private long intervalMillis = 3L * 60 * 1000; // Default is 3 minutes
|
||||
private long queryIntervalMillis = 3L * 60 * 1000; //Default is 3 minutes
|
||||
private String scoreName = "rSquared";
|
||||
private double scoreThreshold = .90;
|
||||
|
||||
@ -78,7 +80,7 @@ public class ConnectionStatusAnalytics implements StatusAnalytics {
|
||||
//Obtain latest observations when available, extend window if needed to obtain minimum observations
|
||||
this.queryWindow = new QueryWindow(extendWindow ? queryWindow.getStartTimeMillis() : queryWindow.getEndTimeMillis(), System.currentTimeMillis());
|
||||
} else {
|
||||
this.queryWindow = new QueryWindow(System.currentTimeMillis() - getIntervalTimeMillis(), System.currentTimeMillis());
|
||||
this.queryWindow = new QueryWindow(System.currentTimeMillis() - getQueryIntervalMillis(), System.currentTimeMillis());
|
||||
}
|
||||
|
||||
modelMap.forEach((metric, modelFunction) -> {
|
||||
@ -94,6 +96,13 @@ public class ConnectionStatusAnalytics implements StatusAnalytics {
|
||||
try {
|
||||
LOG.debug("Refreshing model with new data for connection id: {} ", connectionIdentifier);
|
||||
model.learn(Stream.of(features), Stream.of(values));
|
||||
|
||||
if(MapUtils.isNotEmpty(model.getScores())){
|
||||
model.getScores().forEach((key, value) -> {
|
||||
LOG.debug("Model Scores for prediction metric {} for connection id {}: {}={} ", metric, connectionIdentifier, key, value);
|
||||
});
|
||||
}
|
||||
|
||||
extendWindow = false;
|
||||
} catch (Exception ex) {
|
||||
LOG.debug("Exception encountered while training model for connection id {}: {}", connectionIdentifier, ex.getMessage());
|
||||
@ -137,6 +146,7 @@ public class ConnectionStatusAnalytics implements StatusAnalytics {
|
||||
predictFeatures.put(1, inOutRatio);
|
||||
return convertTimePrediction(bytesModel.predictVariable(0, predictFeatures, backPressureBytes), System.currentTimeMillis());
|
||||
} else {
|
||||
LOG.debug("Model is not valid for calculating time back pressure by content size in bytes. Returning -1");
|
||||
return -1L;
|
||||
}
|
||||
}
|
||||
@ -164,6 +174,7 @@ public class ConnectionStatusAnalytics implements StatusAnalytics {
|
||||
predictFeatures.put(1, inOutRatio);
|
||||
return convertTimePrediction(countModel.predictVariable(0, predictFeatures, backPressureCountThreshold), System.currentTimeMillis());
|
||||
} else {
|
||||
LOG.debug("Model is not valid for calculating time to back pressure by object count. Returning -1");
|
||||
return -1L;
|
||||
}
|
||||
}
|
||||
@ -186,6 +197,7 @@ public class ConnectionStatusAnalytics implements StatusAnalytics {
|
||||
predictFeatures.add(inOutRatio);
|
||||
return convertCountPrediction(bytesModel.predict(predictFeatures.toArray(new Double[2])));
|
||||
} else {
|
||||
LOG.debug("Model is not valid for predicting content size in bytes for next interval. Returning -1");
|
||||
return -1L;
|
||||
}
|
||||
}
|
||||
@ -208,6 +220,7 @@ public class ConnectionStatusAnalytics implements StatusAnalytics {
|
||||
predictFeatures.add(inOutRatio);
|
||||
return convertCountPrediction(countModel.predict(predictFeatures.toArray(new Double[2])));
|
||||
} else {
|
||||
LOG.debug("Model is not valid for predicting object count for next interval. Returning -1");
|
||||
return -1L;
|
||||
}
|
||||
|
||||
@ -266,6 +279,14 @@ public class ConnectionStatusAnalytics implements StatusAnalytics {
|
||||
this.intervalMillis = intervalTimeMillis;
|
||||
}
|
||||
|
||||
public long getQueryIntervalMillis() {
|
||||
return queryIntervalMillis;
|
||||
}
|
||||
|
||||
public void setQueryIntervalMillis(long queryIntervalMillis) {
|
||||
this.queryIntervalMillis = queryIntervalMillis;
|
||||
}
|
||||
|
||||
public String getScoreName() {
|
||||
return scoreName;
|
||||
}
|
||||
@ -334,6 +355,7 @@ public class ConnectionStatusAnalytics implements StatusAnalytics {
|
||||
*/
|
||||
private Long convertTimePrediction(Double prediction, Long timeMillis) {
|
||||
if (Double.isNaN(prediction) || Double.isInfinite(prediction) || prediction < timeMillis) {
|
||||
LOG.debug("Time prediction value is invalid: {}. Returning -1.",prediction);
|
||||
return -1L;
|
||||
} else {
|
||||
return Math.max(0, Math.round(prediction) - timeMillis);
|
||||
@ -346,7 +368,8 @@ public class ConnectionStatusAnalytics implements StatusAnalytics {
|
||||
* @return prediction prediction value converted into valid value for consumption
|
||||
*/
|
||||
private Long convertCountPrediction(Double prediction) {
|
||||
if (Double.isNaN(prediction) || Double.isInfinite(prediction) || prediction < 0) {
|
||||
if (Double.isNaN(prediction) || Double.isInfinite(prediction)) {
|
||||
LOG.debug("Count prediction value is invalid: {}. Returning -1.",prediction);
|
||||
return -1L;
|
||||
} else {
|
||||
return Math.max(0, Math.round(prediction));
|
||||
|
@ -37,16 +37,19 @@ public class ConnectionStatusAnalyticsEngine implements StatusAnalyticsEngine {
|
||||
protected final FlowFileEventRepository flowFileEventRepository;
|
||||
protected final Map<String, Tuple<StatusAnalyticsModel, StatusMetricExtractFunction>> modelMap;
|
||||
protected final long predictionIntervalMillis;
|
||||
protected final long queryIntervalMillis;
|
||||
protected final String scoreName;
|
||||
protected final double scoreThreshold;
|
||||
|
||||
public ConnectionStatusAnalyticsEngine(FlowManager flowManager, ComponentStatusRepository statusRepository, FlowFileEventRepository flowFileEventRepository,
|
||||
Map<String, Tuple<StatusAnalyticsModel, StatusMetricExtractFunction>> modelMap, long predictionIntervalMillis, String scoreName, double scoreThreshold) {
|
||||
Map<String, Tuple<StatusAnalyticsModel, StatusMetricExtractFunction>> modelMap, long predictionIntervalMillis,
|
||||
long queryIntervalMillis, String scoreName, double scoreThreshold) {
|
||||
this.flowManager = flowManager;
|
||||
this.statusRepository = statusRepository;
|
||||
this.flowFileEventRepository = flowFileEventRepository;
|
||||
this.predictionIntervalMillis = predictionIntervalMillis;
|
||||
this.modelMap = modelMap;
|
||||
this.queryIntervalMillis = queryIntervalMillis;
|
||||
this.scoreName = scoreName;
|
||||
this.scoreThreshold = scoreThreshold;
|
||||
}
|
||||
@ -60,6 +63,7 @@ public class ConnectionStatusAnalyticsEngine implements StatusAnalyticsEngine {
|
||||
public StatusAnalytics getStatusAnalytics(String identifier) {
|
||||
ConnectionStatusAnalytics connectionStatusAnalytics = new ConnectionStatusAnalytics(statusRepository, flowManager, flowFileEventRepository, modelMap, identifier, false);
|
||||
connectionStatusAnalytics.setIntervalTimeMillis(predictionIntervalMillis);
|
||||
connectionStatusAnalytics.setQueryIntervalMillis(queryIntervalMillis);
|
||||
connectionStatusAnalytics.setScoreName(scoreName);
|
||||
connectionStatusAnalytics.setScoreThreshold(scoreThreshold);
|
||||
connectionStatusAnalytics.refresh();
|
||||
|
@ -33,15 +33,17 @@ public class TestCachingConnectionStatusAnalyticsEngine extends TestStatusAnalyt
|
||||
public StatusAnalyticsEngine getStatusAnalyticsEngine(FlowManager flowManager, FlowFileEventRepository flowFileEventRepository,
|
||||
ComponentStatusRepository componentStatusRepository,
|
||||
Map<String, Tuple<StatusAnalyticsModel, StatusMetricExtractFunction>> modelMap,
|
||||
long predictIntervalMillis, String scoreName, double scoreThreshold) {
|
||||
long predictIntervalMillis, long queryIntervalMillis, String scoreName, double scoreThreshold) {
|
||||
|
||||
return new CachingConnectionStatusAnalyticsEngine(flowManager, componentStatusRepository, flowFileEventRepository, modelMap, predictIntervalMillis, scoreName, scoreThreshold);
|
||||
return new CachingConnectionStatusAnalyticsEngine(flowManager, componentStatusRepository, flowFileEventRepository, modelMap, predictIntervalMillis,
|
||||
queryIntervalMillis, scoreName, scoreThreshold);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCachedStatusAnalytics() {
|
||||
StatusAnalyticsEngine statusAnalyticsEngine = new CachingConnectionStatusAnalyticsEngine(flowManager, statusRepository, flowFileEventRepository, modelMap,
|
||||
DEFAULT_PREDICT_INTERVAL_MILLIS, DEFAULT_SCORE_NAME, DEFAULT_SCORE_THRESHOLD);
|
||||
DEFAULT_PREDICT_INTERVAL_MILLIS, DEFAULT_QUERY_INTERVAL_MILLIS,
|
||||
DEFAULT_SCORE_NAME, DEFAULT_SCORE_THRESHOLD);
|
||||
StatusAnalytics statusAnalyticsA = statusAnalyticsEngine.getStatusAnalytics("A");
|
||||
StatusAnalytics statusAnalyticsB = statusAnalyticsEngine.getStatusAnalytics("B");
|
||||
StatusAnalytics statusAnalyticsTest = statusAnalyticsEngine.getStatusAnalytics("A");
|
||||
|
@ -28,8 +28,9 @@ public class TestConnectionStatusAnalyticsEngine extends TestStatusAnalyticsEngi
|
||||
@Override
|
||||
public StatusAnalyticsEngine getStatusAnalyticsEngine(FlowManager flowManager, FlowFileEventRepository flowFileEventRepository,
|
||||
ComponentStatusRepository statusRepository, Map<String, Tuple<StatusAnalyticsModel, StatusMetricExtractFunction>> modelMap,
|
||||
long predictIntervalMillis, String scoreName, double scoreThreshold) {
|
||||
return new ConnectionStatusAnalyticsEngine(flowManager, statusRepository, flowFileEventRepository,modelMap, DEFAULT_PREDICT_INTERVAL_MILLIS, scoreName, scoreThreshold);
|
||||
long predictIntervalMillis, long queryIntervalMillis, String scoreName, double scoreThreshold) {
|
||||
return new ConnectionStatusAnalyticsEngine(flowManager, statusRepository, flowFileEventRepository,modelMap,
|
||||
DEFAULT_PREDICT_INTERVAL_MILLIS, DEFAULT_QUERY_INTERVAL_MILLIS, scoreName, scoreThreshold);
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -43,6 +43,7 @@ import org.mockito.stubbing.Answer;
|
||||
public abstract class TestStatusAnalyticsEngine {
|
||||
|
||||
static final long DEFAULT_PREDICT_INTERVAL_MILLIS = 3L * 60 * 1000;
|
||||
static final long DEFAULT_QUERY_INTERVAL_MILLIS = 3L * 60 * 1000;
|
||||
static final String DEFAULT_SCORE_NAME = "rSquared";
|
||||
static final double DEFAULT_SCORE_THRESHOLD = .9;
|
||||
|
||||
@ -89,13 +90,13 @@ public abstract class TestStatusAnalyticsEngine {
|
||||
@Test
|
||||
public void testGetStatusAnalytics() {
|
||||
StatusAnalyticsEngine statusAnalyticsEngine = getStatusAnalyticsEngine(flowManager,flowFileEventRepository, statusRepository, modelMap, DEFAULT_PREDICT_INTERVAL_MILLIS,
|
||||
DEFAULT_SCORE_NAME, DEFAULT_SCORE_THRESHOLD);
|
||||
DEFAULT_QUERY_INTERVAL_MILLIS, DEFAULT_SCORE_NAME, DEFAULT_SCORE_THRESHOLD);
|
||||
StatusAnalytics statusAnalytics = statusAnalyticsEngine.getStatusAnalytics("1");
|
||||
assertNotNull(statusAnalytics);
|
||||
}
|
||||
|
||||
public abstract StatusAnalyticsEngine getStatusAnalyticsEngine(FlowManager flowManager, FlowFileEventRepository flowFileEventRepository,
|
||||
ComponentStatusRepository componentStatusRepository, Map<String, Tuple<StatusAnalyticsModel, StatusMetricExtractFunction>> modelMap,
|
||||
long predictIntervalMillis, String scoreName, double scoreThreshold);
|
||||
long predictIntervalMillis, long queryIntervalMillis, String scoreName, double scoreThreshold);
|
||||
|
||||
}
|
||||
|
@ -212,6 +212,7 @@
|
||||
<!-- nifi.properties: analytics properties -->
|
||||
<nifi.analytics.predict.enabled>false</nifi.analytics.predict.enabled>
|
||||
<nifi.analytics.predict.interval>3 mins</nifi.analytics.predict.interval>
|
||||
<nifi.analytics.query.interval>3 mins</nifi.analytics.query.interval>
|
||||
<nifi.analytics.connection.model.implementation>org.apache.nifi.controller.status.analytics.models.OrdinaryLeastSquares</nifi.analytics.connection.model.implementation>
|
||||
<nifi.analytics.connection.model.score.name>rSquared</nifi.analytics.connection.model.score.name>
|
||||
<nifi.analytics.connection.model.score.threshold>.90</nifi.analytics.connection.model.score.threshold>
|
||||
|
@ -260,6 +260,7 @@ nifi.variable.registry.properties=
|
||||
# analytics properties #
|
||||
nifi.analytics.predict.enabled=${nifi.analytics.predict.enabled}
|
||||
nifi.analytics.predict.interval=${nifi.analytics.predict.interval}
|
||||
nifi.analytics.query.interval=${nifi.analytics.query.interval}
|
||||
nifi.analytics.connection.model.implementation=${nifi.analytics.connection.model.implementation}
|
||||
nifi.analytics.connection.model.score.name=${nifi.analytics.connection.model.score.name}
|
||||
nifi.analytics.connection.model.score.threshold=${nifi.analytics.connection.model.score.threshold}
|
||||
|
Loading…
x
Reference in New Issue
Block a user