NIFI-7437 - created separate thread for preloading predictions, refactors for performance

NIFI-7437 - reduced scheduler to 15 seconds, change cache to expire after no access vs expire after write

Signed-off-by: Matthew Burgess <mattyb149@apache.org>

This closes #4274
This commit is contained in:
Yolanda M. Davis 2020-05-14 05:49:47 -04:00 committed by Matthew Burgess
parent c51b9051a8
commit 13418ccb91
No known key found for this signature in database
GPG Key ID: 05D3DEB8126DAD24
8 changed files with 122 additions and 123 deletions

View File

@ -120,6 +120,7 @@ import org.apache.nifi.controller.repository.FlowFileRecord;
import org.apache.nifi.controller.repository.FlowFileRepository;
import org.apache.nifi.controller.repository.FlowFileSwapManager;
import org.apache.nifi.controller.repository.QueueProvider;
import org.apache.nifi.controller.repository.RepositoryStatusReport;
import org.apache.nifi.controller.repository.StandardCounterRepository;
import org.apache.nifi.controller.repository.StandardFlowFileRecord;
import org.apache.nifi.controller.repository.StandardQueueProvider;
@ -151,6 +152,7 @@ import org.apache.nifi.controller.service.StandardControllerServiceProvider;
import org.apache.nifi.controller.state.manager.StandardStateManagerProvider;
import org.apache.nifi.controller.state.server.ZooKeeperStateServer;
import org.apache.nifi.controller.status.analytics.CachingConnectionStatusAnalyticsEngine;
import org.apache.nifi.controller.status.analytics.ConnectionStatusAnalytics;
import org.apache.nifi.controller.status.analytics.StatusAnalyticsEngine;
import org.apache.nifi.controller.status.analytics.StatusAnalyticsModelMapFactory;
import org.apache.nifi.controller.status.history.ComponentStatusRepository;
@ -683,8 +685,28 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node
StatusAnalyticsModelMapFactory statusAnalyticsModelMapFactory = new StatusAnalyticsModelMapFactory(extensionManager, nifiProperties);
analyticsEngine = new CachingConnectionStatusAnalyticsEngine(flowManager, componentStatusRepository, flowFileEventRepository, statusAnalyticsModelMapFactory,
analyticsEngine = new CachingConnectionStatusAnalyticsEngine(flowManager, componentStatusRepository, statusAnalyticsModelMapFactory,
predictionIntervalMillis, queryIntervalMillis, modelScoreName, modelScoreThreshold);
timerDrivenEngineRef.get().scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
try {
Long startTs = System.currentTimeMillis();
RepositoryStatusReport statusReport = flowFileEventRepository.reportTransferEvents(startTs);
flowManager.findAllConnections().forEach(connection -> {
ConnectionStatusAnalytics connectionStatusAnalytics = ((ConnectionStatusAnalytics)analyticsEngine.getStatusAnalytics(connection.getIdentifier()));
connectionStatusAnalytics.refresh();
connectionStatusAnalytics.loadPredictions(statusReport);
});
Long endTs = System.currentTimeMillis();
LOG.debug("Time Elapsed for Prediction for loading all predictions: {}", endTs - startTs);
} catch (final Exception e) {
LOG.error("Failed to generate predictions", e);
}
}
}, 0L, 15, TimeUnit.SECONDS);
}
eventAccess = new StandardEventAccess(this, flowFileEventRepository);

View File

@ -19,7 +19,6 @@ package org.apache.nifi.controller.status.analytics;
import java.util.concurrent.TimeUnit;
import org.apache.nifi.controller.flow.FlowManager;
import org.apache.nifi.controller.repository.FlowFileEventRepository;
import org.apache.nifi.controller.status.history.ComponentStatusRepository;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -37,13 +36,13 @@ public class CachingConnectionStatusAnalyticsEngine extends ConnectionStatusAnal
private static final Logger LOG = LoggerFactory.getLogger(CachingConnectionStatusAnalyticsEngine.class);
public CachingConnectionStatusAnalyticsEngine(FlowManager flowManager, ComponentStatusRepository statusRepository,
FlowFileEventRepository flowFileEventRepository, StatusAnalyticsModelMapFactory statusAnalyticsModelMapFactory,
StatusAnalyticsModelMapFactory statusAnalyticsModelMapFactory,
long predictionIntervalMillis, long queryIntervalMillis, String scoreName, double scoreThreshold) {
super(flowManager, statusRepository, flowFileEventRepository, statusAnalyticsModelMapFactory, predictionIntervalMillis,
super(flowManager, statusRepository, statusAnalyticsModelMapFactory, predictionIntervalMillis,
queryIntervalMillis, scoreName, scoreThreshold);
this.cache = Caffeine.newBuilder()
.expireAfterWrite(30, TimeUnit.MINUTES)
.expireAfterAccess(5, TimeUnit.MINUTES)
.build();
}
@ -60,9 +59,6 @@ public class CachingConnectionStatusAnalyticsEngine extends ConnectionStatusAnal
LOG.debug("Creating new status analytics object for connection id: {}", identifier);
connectionStatusAnalytics = super.getStatusAnalytics(identifier);
cache.put(identifier, connectionStatusAnalytics);
} else {
LOG.debug("Pulled existing analytics from cache for connection id: {}", identifier);
((ConnectionStatusAnalytics)connectionStatusAnalytics).refresh();
}
return connectionStatusAnalytics;

View File

@ -21,6 +21,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Stream;
import org.apache.commons.collections4.MapUtils;
@ -28,7 +29,6 @@ import org.apache.commons.lang3.ArrayUtils;
import org.apache.nifi.connectable.Connection;
import org.apache.nifi.controller.flow.FlowManager;
import org.apache.nifi.controller.repository.FlowFileEvent;
import org.apache.nifi.controller.repository.FlowFileEventRepository;
import org.apache.nifi.controller.repository.RepositoryStatusReport;
import org.apache.nifi.controller.status.history.ComponentStatusRepository;
import org.apache.nifi.controller.status.history.StatusHistory;
@ -49,7 +49,6 @@ public class ConnectionStatusAnalytics implements StatusAnalytics {
private final Map<String, Tuple<StatusAnalyticsModel, StatusMetricExtractFunction>> modelMap;
private QueryWindow queryWindow;
private final ComponentStatusRepository componentStatusRepository;
private final FlowFileEventRepository flowFileEventRepository;
private final String connectionIdentifier;
private final FlowManager flowManager;
private final Boolean supportOnlineLearning;
@ -58,15 +57,24 @@ public class ConnectionStatusAnalytics implements StatusAnalytics {
private long queryIntervalMillis = 5L * 60 * 1000; //Default is 3 minutes
private String scoreName = "rSquared";
private double scoreThreshold = .90;
private Map<String, Long> predictions;
public ConnectionStatusAnalytics(ComponentStatusRepository componentStatusRepository, FlowManager flowManager, FlowFileEventRepository flowFileEventRepository,
private static String TIME_TO_BYTE_BACKPRESSURE_MILLIS = "timeToBytesBackpressureMillis";
private static String TIME_TO_COUNT_BACKPRESSURE_MILLIS = "timeToCountBackpressureMillis";
private static String NEXT_INTERVAL_BYTES = "nextIntervalBytes";
private static String NEXT_INTERVAL_COUNT = "nextIntervalCount";
private static String NEXT_INTERVAL_PERCENTAGE_USE_COUNT = "nextIntervalPercentageUseCount";
private static String NEXT_INTERVAL_PERCENTAGE_USE_BYTES = "nextIntervalPercentageUseBytes";
private static String INTERVAL_TIME_MILLIS = "intervalTimeMillis";
public ConnectionStatusAnalytics(ComponentStatusRepository componentStatusRepository, FlowManager flowManager,
Map<String, Tuple<StatusAnalyticsModel, StatusMetricExtractFunction>> modelMap, String connectionIdentifier, Boolean supportOnlineLearning) {
this.componentStatusRepository = componentStatusRepository;
this.flowManager = flowManager;
this.flowFileEventRepository = flowFileEventRepository;
this.modelMap = modelMap;
this.connectionIdentifier = connectionIdentifier;
this.supportOnlineLearning = supportOnlineLearning;
predictions = initPredictions();
}
/**
@ -126,15 +134,9 @@ public class ConnectionStatusAnalytics implements StatusAnalytics {
*
* @return milliseconds until backpressure is predicted to occur, based on the total number of bytes in the queue.
*/
public Long getTimeToBytesBackpressureMillis() {
Long getTimeToBytesBackpressureMillis(final Connection connection, FlowFileEvent flowFileEvent) {
final StatusAnalyticsModel bytesModel = getModel("queuedBytes");
FlowFileEvent flowFileEvent = getStatusReport();
final Connection connection = getConnection();
if (connection == null) {
throw new NoSuchElementException("Connection with the following id cannot be found:" + connectionIdentifier + ". Model should be invalidated!");
}
final String backPressureDataSize = connection.getFlowFileQueue().getBackPressureDataSizeThreshold();
final double backPressureBytes = DataUnit.parseDataSize(backPressureDataSize, DataUnit.B);
@ -154,15 +156,9 @@ public class ConnectionStatusAnalytics implements StatusAnalytics {
*
* @return milliseconds until backpressure is predicted to occur, based on the number of objects in the queue.
*/
public Long getTimeToCountBackpressureMillis() {
Long getTimeToCountBackpressureMillis(final Connection connection, FlowFileEvent flowFileEvent) {
final StatusAnalyticsModel countModel = getModel("queuedCount");
FlowFileEvent flowFileEvent = getStatusReport();
final Connection connection = getConnection();
if (connection == null) {
throw new NoSuchElementException("Connection with the following id cannot be found:" + connectionIdentifier + ". Model should be invalidated!");
}
final double backPressureCountThreshold = connection.getFlowFileQueue().getBackPressureObjectThreshold();
@ -183,9 +179,8 @@ public class ConnectionStatusAnalytics implements StatusAnalytics {
* @return milliseconds until backpressure is predicted to occur, based on the total number of bytes in the queue.
*/
public Long getNextIntervalBytes() {
Long getNextIntervalBytes(FlowFileEvent flowFileEvent) {
final StatusAnalyticsModel bytesModel = getModel("queuedBytes");
FlowFileEvent flowFileEvent = getStatusReport();
if (validModel(bytesModel) && flowFileEvent != null) {
List<Double> predictFeatures = new ArrayList<>();
@ -206,9 +201,8 @@ public class ConnectionStatusAnalytics implements StatusAnalytics {
* @return milliseconds until backpressure is predicted to occur, based on the number of bytes in the queue.
*/
public Long getNextIntervalCount() {
Long getNextIntervalCount(FlowFileEvent flowFileEvent) {
final StatusAnalyticsModel countModel = getModel("queuedCount");
FlowFileEvent flowFileEvent = getStatusReport();
if (validModel(countModel) && flowFileEvent != null) {
List<Double> predictFeatures = new ArrayList<>();
@ -229,14 +223,10 @@ public class ConnectionStatusAnalytics implements StatusAnalytics {
*
* @return percentage of bytes used at next interval
*/
public Long getNextIntervalPercentageUseCount() {
Long getNextIntervalPercentageUseCount(final Connection connection, FlowFileEvent flowFileEvent) {
final Connection connection = getConnection();
if (connection == null) {
throw new NoSuchElementException("Connection with the following id cannot be found:" + connectionIdentifier + ". Model should be invalidated!");
}
final double backPressureCountThreshold = connection.getFlowFileQueue().getBackPressureObjectThreshold();
final long nextIntervalCount = getNextIntervalCount();
final long nextIntervalCount = getNextIntervalCount(flowFileEvent);
if (nextIntervalCount > -1L) {
return Math.min(100, Math.round((nextIntervalCount / backPressureCountThreshold) * 100));
@ -252,18 +242,14 @@ public class ConnectionStatusAnalytics implements StatusAnalytics {
* @return percentage of bytes used at next interval
*/
public Long getNextIntervalPercentageUseBytes() {
Long getNextIntervalPercentageUseBytes(final Connection connection, FlowFileEvent flowFileEvent) {
final Connection connection = getConnection();
if (connection == null) {
throw new NoSuchElementException("Connection with the following id cannot be found:" + connectionIdentifier + ". Model should be invalidated!");
}
final String backPressureDataSize = connection.getFlowFileQueue().getBackPressureDataSizeThreshold();
final double backPressureBytes = DataUnit.parseDataSize(backPressureDataSize, DataUnit.B);
final long nextIntervalBytes = getNextIntervalBytes();
final long nextIntervalBytes = getNextIntervalBytes(flowFileEvent);
if (nextIntervalBytes > -1L) {
return Math.min(100, Math.round((getNextIntervalBytes() / backPressureBytes) * 100));
return Math.min(100, Math.round((getNextIntervalBytes(flowFileEvent) / backPressureBytes) * 100));
} else {
return -1L;
}
@ -311,42 +297,45 @@ public class ConnectionStatusAnalytics implements StatusAnalytics {
*/
@Override
public Map<String, Long> getPredictions() {
Map<String, Long> predictions = new HashMap<>();
predictions.put("timeToBytesBackpressureMillis", getTimeToBytesBackpressureMillis());
predictions.put("timeToCountBackpressureMillis", getTimeToCountBackpressureMillis());
predictions.put("nextIntervalBytes", getNextIntervalBytes());
predictions.put("nextIntervalCount", getNextIntervalCount());
predictions.put("nextIntervalPercentageUseCount", getNextIntervalPercentageUseCount());
predictions.put("nextIntervalPercentageUseBytes", getNextIntervalPercentageUseBytes());
predictions.put("intervalTimeMillis", getIntervalTimeMillis());
predictions.forEach((key, value) -> {
LOG.debug("Prediction model for connection id {}: {}={} ", connectionIdentifier, key, value);
});
return predictions;
}
public void loadPredictions(final RepositoryStatusReport statusReport) {
long startTs = System.currentTimeMillis();
Connection connection = flowManager.getConnection(connectionIdentifier);
if (connection == null) {
throw new NoSuchElementException("Connection with the following id cannot be found:" + connectionIdentifier + ". Model should be invalidated!");
}
FlowFileEvent flowFileEvent = statusReport.getReportEntry(connectionIdentifier);
predictions.put(TIME_TO_BYTE_BACKPRESSURE_MILLIS, getTimeToBytesBackpressureMillis(connection, flowFileEvent));
predictions.put(TIME_TO_COUNT_BACKPRESSURE_MILLIS, getTimeToCountBackpressureMillis(connection, flowFileEvent));
predictions.put(NEXT_INTERVAL_BYTES, getNextIntervalBytes(flowFileEvent));
predictions.put(NEXT_INTERVAL_COUNT, getNextIntervalCount(flowFileEvent));
predictions.put(NEXT_INTERVAL_PERCENTAGE_USE_COUNT, getNextIntervalPercentageUseCount(connection, flowFileEvent));
predictions.put(NEXT_INTERVAL_PERCENTAGE_USE_BYTES, getNextIntervalPercentageUseBytes(connection, flowFileEvent));
predictions.put(INTERVAL_TIME_MILLIS, getIntervalTimeMillis());
long endTs = System.currentTimeMillis();
LOG.debug("Prediction Calculations for connectionID {}: {}", connectionIdentifier, endTs - startTs);
predictions.forEach((key, value) -> {
LOG.trace("Prediction model for connection id {}: {}={} ", connectionIdentifier, key, value);
});
}
@Override
public boolean supportsOnlineLearning() {
return supportOnlineLearning;
}
private Connection getConnection() {
Connection connection = null;
for (Connection c : flowManager.findAllConnections()) {
if (c.getIdentifier().equals(this.connectionIdentifier)) {
connection = c;
break;
}
}
return connection;
}
private FlowFileEvent getStatusReport() {
RepositoryStatusReport statusReport = flowFileEventRepository.reportTransferEvents(System.currentTimeMillis());
return statusReport.getReportEntry(this.connectionIdentifier);
private Map<String, Long> initPredictions() {
predictions = new ConcurrentHashMap<>();
predictions.put(TIME_TO_BYTE_BACKPRESSURE_MILLIS, -1L);
predictions.put(TIME_TO_COUNT_BACKPRESSURE_MILLIS, -1L);
predictions.put(NEXT_INTERVAL_BYTES, -1L);
predictions.put(NEXT_INTERVAL_COUNT, -1L);
predictions.put(NEXT_INTERVAL_PERCENTAGE_USE_COUNT, -1L);
predictions.put(NEXT_INTERVAL_PERCENTAGE_USE_BYTES, -1L);
predictions.put(INTERVAL_TIME_MILLIS, -1L);
return predictions;
}
/**

View File

@ -19,7 +19,6 @@ package org.apache.nifi.controller.status.analytics;
import java.util.Map;
import org.apache.nifi.controller.flow.FlowManager;
import org.apache.nifi.controller.repository.FlowFileEventRepository;
import org.apache.nifi.controller.status.history.ComponentStatusRepository;
import org.apache.nifi.util.Tuple;
import org.slf4j.Logger;
@ -34,19 +33,17 @@ public class ConnectionStatusAnalyticsEngine implements StatusAnalyticsEngine {
private static final Logger LOG = LoggerFactory.getLogger(ConnectionStatusAnalyticsEngine.class);
protected final ComponentStatusRepository statusRepository;
protected final FlowManager flowManager;
protected final FlowFileEventRepository flowFileEventRepository;
protected final StatusAnalyticsModelMapFactory statusAnalyticsModelMapFactory;
protected final long predictionIntervalMillis;
protected final long queryIntervalMillis;
protected final String scoreName;
protected final double scoreThreshold;
public ConnectionStatusAnalyticsEngine(FlowManager flowManager, ComponentStatusRepository statusRepository, FlowFileEventRepository flowFileEventRepository,
public ConnectionStatusAnalyticsEngine(FlowManager flowManager, ComponentStatusRepository statusRepository,
StatusAnalyticsModelMapFactory statusAnalyticsModelMapFactory, long predictionIntervalMillis,
long queryIntervalMillis, String scoreName, double scoreThreshold) {
this.flowManager = flowManager;
this.statusRepository = statusRepository;
this.flowFileEventRepository = flowFileEventRepository;
this.predictionIntervalMillis = predictionIntervalMillis;
this.statusAnalyticsModelMapFactory = statusAnalyticsModelMapFactory;
this.queryIntervalMillis = queryIntervalMillis;
@ -62,12 +59,11 @@ public class ConnectionStatusAnalyticsEngine implements StatusAnalyticsEngine {
@Override
public StatusAnalytics getStatusAnalytics(String identifier) {
Map<String, Tuple<StatusAnalyticsModel, StatusMetricExtractFunction>> modelMap = statusAnalyticsModelMapFactory.getConnectionStatusModelMap();
ConnectionStatusAnalytics connectionStatusAnalytics = new ConnectionStatusAnalytics(statusRepository, flowManager, flowFileEventRepository, modelMap, identifier, false);
ConnectionStatusAnalytics connectionStatusAnalytics = new ConnectionStatusAnalytics(statusRepository, flowManager, modelMap, identifier, false);
connectionStatusAnalytics.setIntervalTimeMillis(predictionIntervalMillis);
connectionStatusAnalytics.setQueryIntervalMillis(queryIntervalMillis);
connectionStatusAnalytics.setScoreName(scoreName);
connectionStatusAnalytics.setScoreThreshold(scoreThreshold);
connectionStatusAnalytics.refresh();
return connectionStatusAnalytics;
}

View File

@ -20,24 +20,23 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import org.apache.nifi.controller.flow.FlowManager;
import org.apache.nifi.controller.repository.FlowFileEventRepository;
import org.apache.nifi.controller.status.history.ComponentStatusRepository;
import org.junit.Test;
public class TestCachingConnectionStatusAnalyticsEngine extends TestStatusAnalyticsEngine {
@Override
public StatusAnalyticsEngine getStatusAnalyticsEngine(FlowManager flowManager, FlowFileEventRepository flowFileEventRepository,
public StatusAnalyticsEngine getStatusAnalyticsEngine(FlowManager flowManager,
ComponentStatusRepository componentStatusRepository, StatusAnalyticsModelMapFactory statusAnalyticsModelMapFactory,
long predictIntervalMillis, long queryIntervalMillis, String scoreName, double scoreThreshold) {
return new CachingConnectionStatusAnalyticsEngine(flowManager, componentStatusRepository, flowFileEventRepository, statusAnalyticsModelMapFactory, predictIntervalMillis,
return new CachingConnectionStatusAnalyticsEngine(flowManager, componentStatusRepository, statusAnalyticsModelMapFactory, predictIntervalMillis,
queryIntervalMillis, scoreName, scoreThreshold);
}
@Test
public void testCachedStatusAnalytics() {
StatusAnalyticsEngine statusAnalyticsEngine = new CachingConnectionStatusAnalyticsEngine(flowManager, statusRepository, flowFileEventRepository, statusAnalyticsModelMapFactory,
StatusAnalyticsEngine statusAnalyticsEngine = new CachingConnectionStatusAnalyticsEngine(flowManager, statusRepository, statusAnalyticsModelMapFactory,
DEFAULT_PREDICT_INTERVAL_MILLIS, DEFAULT_QUERY_INTERVAL_MILLIS,
DEFAULT_SCORE_NAME, DEFAULT_SCORE_THRESHOLD);
StatusAnalytics statusAnalyticsA = statusAnalyticsEngine.getStatusAnalytics("A");

View File

@ -22,7 +22,6 @@ import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.when;
@ -42,7 +41,6 @@ import org.apache.nifi.connectable.Connection;
import org.apache.nifi.controller.flow.FlowManager;
import org.apache.nifi.controller.queue.FlowFileQueue;
import org.apache.nifi.controller.repository.FlowFileEvent;
import org.apache.nifi.controller.repository.FlowFileEventRepository;
import org.apache.nifi.controller.repository.RepositoryStatusReport;
import org.apache.nifi.controller.status.history.ComponentStatusRepository;
import org.apache.nifi.controller.status.history.ConnectionStatusDescriptor;
@ -64,6 +62,10 @@ public class TestConnectionStatusAnalytics {
.map(ConnectionStatusDescriptor::getDescriptor)
.collect(Collectors.toSet());
final Connection connection = Mockito.mock(Connection.class);
final FlowFileEvent flowFileEvent = Mockito.mock(FlowFileEvent.class);
final RepositoryStatusReport repositoryStatusReport = Mockito.mock(RepositoryStatusReport.class);
protected ConnectionStatusAnalytics getConnectionStatusAnalytics(Map<String, Tuple<StatusAnalyticsModel, StatusMetricExtractFunction>> modelMap) {
ComponentStatusRepository statusRepository = Mockito.mock(ComponentStatusRepository.class);
@ -80,11 +82,7 @@ public class TestConnectionStatusAnalytics {
final ProcessGroup processGroup = Mockito.mock(ProcessGroup.class);
final StatusHistory statusHistory = Mockito.mock(StatusHistory.class);
final Connection connection = Mockito.mock(Connection.class);
final FlowFileQueue flowFileQueue = Mockito.mock(FlowFileQueue.class);
final FlowFileEventRepository flowFileEventRepository = Mockito.mock(FlowFileEventRepository.class);
final RepositoryStatusReport repositoryStatusReport = Mockito.mock(RepositoryStatusReport.class);
final FlowFileEvent flowFileEvent = Mockito.mock(FlowFileEvent.class);
final Set<Connection> connections = new HashSet<>();
@ -95,17 +93,15 @@ public class TestConnectionStatusAnalytics {
when(flowFileQueue.getBackPressureObjectThreshold()).thenReturn(100L);
when(connection.getIdentifier()).thenReturn(connectionIdentifier);
when(connection.getFlowFileQueue()).thenReturn(flowFileQueue);
when(flowManager.findAllConnections()).thenReturn(connections);
when(flowManager.getRootGroup()).thenReturn(processGroup);
when(flowManager.getConnection(anyString())).thenReturn(connection);
when(flowFileEvent.getContentSizeIn()).thenReturn(10L);
when(flowFileEvent.getContentSizeOut()).thenReturn(10L);
when(flowFileEvent.getFlowFilesIn()).thenReturn(10);
when(flowFileEvent.getFlowFilesOut()).thenReturn(10);
when(flowFileEventRepository.reportTransferEvents(anyLong())).thenReturn(repositoryStatusReport);
when(repositoryStatusReport.getReportEntry(anyString())).thenReturn(flowFileEvent);
when(statusRepository.getConnectionStatusHistory(anyString(), any(), any(), anyInt())).thenReturn(statusHistory);
ConnectionStatusAnalytics connectionStatusAnalytics = new ConnectionStatusAnalytics(statusRepository, flowManager,flowFileEventRepository,
ConnectionStatusAnalytics connectionStatusAnalytics = new ConnectionStatusAnalytics(statusRepository, flowManager,
modelMap, connectionIdentifier, false);
connectionStatusAnalytics.refresh();
return connectionStatusAnalytics;
@ -144,7 +140,7 @@ public class TestConnectionStatusAnalytics {
Long tomorrowMillis = DateUtils.addDays(now,1).toInstant().toEpochMilli();
Map<String, Tuple<StatusAnalyticsModel, StatusMetricExtractFunction>> modelMap = getModelMap("queuedCount",.5,100.0,tomorrowMillis.doubleValue());
ConnectionStatusAnalytics connectionStatusAnalytics = getConnectionStatusAnalytics(modelMap);
Long countTime = connectionStatusAnalytics.getTimeToCountBackpressureMillis();
Long countTime = connectionStatusAnalytics.getTimeToCountBackpressureMillis(connection, flowFileEvent);
assertNotNull(countTime);
assert (countTime == -1);
}
@ -153,7 +149,7 @@ public class TestConnectionStatusAnalytics {
public void testInvalidModelNaNScore() {
Map<String, Tuple<StatusAnalyticsModel, StatusMetricExtractFunction>> modelMap = getModelMap("queuedCount",Double.NaN,Double.NaN,Double.NaN);
ConnectionStatusAnalytics connectionStatusAnalytics = getConnectionStatusAnalytics(modelMap);
Long countTime = connectionStatusAnalytics.getTimeToCountBackpressureMillis();
Long countTime = connectionStatusAnalytics.getTimeToCountBackpressureMillis(connection, flowFileEvent);
assertNotNull(countTime);
assert (countTime == -1);
}
@ -162,7 +158,7 @@ public class TestConnectionStatusAnalytics {
public void testInvalidModelInfiniteScore() {
Map<String, Tuple<StatusAnalyticsModel, StatusMetricExtractFunction>> modelMap = getModelMap("queuedCount",Double.POSITIVE_INFINITY,Double.POSITIVE_INFINITY,Double.POSITIVE_INFINITY);
ConnectionStatusAnalytics connectionStatusAnalytics = getConnectionStatusAnalytics(modelMap);
Long countTime = connectionStatusAnalytics.getTimeToCountBackpressureMillis();
Long countTime = connectionStatusAnalytics.getTimeToCountBackpressureMillis(connection, flowFileEvent);
assertNotNull(countTime);
assert (countTime == -1);
}
@ -182,7 +178,7 @@ public class TestConnectionStatusAnalytics {
Long tomorrowMillis = DateUtils.addDays(now,1).toInstant().toEpochMilli();
Map<String, Tuple<StatusAnalyticsModel, StatusMetricExtractFunction>> modelMap = getModelMap("queuedCount",.9,100.0,tomorrowMillis.doubleValue());
ConnectionStatusAnalytics connectionStatusAnalytics = getConnectionStatusAnalytics(modelMap);
Long countTime = connectionStatusAnalytics.getTimeToCountBackpressureMillis();
Long countTime = connectionStatusAnalytics.getTimeToCountBackpressureMillis(connection, flowFileEvent);
assertNotNull(countTime);
assert (countTime > 0);
}
@ -191,7 +187,7 @@ public class TestConnectionStatusAnalytics {
public void testCannotPredictTimeToCountNaN() {
Map<String, Tuple<StatusAnalyticsModel, StatusMetricExtractFunction>> modelMap = getModelMap("queuedCount",.9,Double.NaN,Double.NaN);
ConnectionStatusAnalytics connectionStatusAnalytics = getConnectionStatusAnalytics(modelMap);
Long countTime = connectionStatusAnalytics.getTimeToCountBackpressureMillis();
Long countTime = connectionStatusAnalytics.getTimeToCountBackpressureMillis(connection, flowFileEvent);
assertNotNull(countTime);
assert (countTime == -1);
}
@ -200,7 +196,7 @@ public class TestConnectionStatusAnalytics {
public void testCannotPredictTimeToCountInfinite() {
Map<String, Tuple<StatusAnalyticsModel, StatusMetricExtractFunction>> modelMap = getModelMap("queuedCount",.9,Double.POSITIVE_INFINITY,Double.POSITIVE_INFINITY);
ConnectionStatusAnalytics connectionStatusAnalytics = getConnectionStatusAnalytics(modelMap);
Long countTime = connectionStatusAnalytics.getTimeToCountBackpressureMillis();
Long countTime = connectionStatusAnalytics.getTimeToCountBackpressureMillis(connection, flowFileEvent);
assertNotNull(countTime);
assert (countTime == -1);
}
@ -209,7 +205,7 @@ public class TestConnectionStatusAnalytics {
public void testCannotPredictTimeToCountNegative() {
Map<String, Tuple<StatusAnalyticsModel, StatusMetricExtractFunction>> modelMap = getModelMap("queuedCount",.9,-1.0,-1.0);
ConnectionStatusAnalytics connectionStatusAnalytics = getConnectionStatusAnalytics(modelMap);
Long countTime = connectionStatusAnalytics.getTimeToCountBackpressureMillis();
Long countTime = connectionStatusAnalytics.getTimeToCountBackpressureMillis(connection, flowFileEvent);
assertNotNull(countTime);
assert (countTime == -1);
}
@ -219,7 +215,7 @@ public class TestConnectionStatusAnalytics {
Map<String, Tuple<StatusAnalyticsModel, StatusMetricExtractFunction>> modelMap = getModelMap("fakeModel",Double.POSITIVE_INFINITY,Double.POSITIVE_INFINITY,Double.POSITIVE_INFINITY);
ConnectionStatusAnalytics connectionStatusAnalytics = getConnectionStatusAnalytics(modelMap);
try {
connectionStatusAnalytics.getTimeToCountBackpressureMillis();
connectionStatusAnalytics.getTimeToCountBackpressureMillis(connection, flowFileEvent);
fail();
}catch(IllegalArgumentException iae){
assertTrue(true);
@ -232,7 +228,7 @@ public class TestConnectionStatusAnalytics {
Long tomorrowMillis = DateUtils.addDays(now,1).toInstant().toEpochMilli();
Map<String, Tuple<StatusAnalyticsModel, StatusMetricExtractFunction>> modelMap = getModelMap("queuedBytes",.9,100.0,tomorrowMillis.doubleValue());
ConnectionStatusAnalytics connectionStatusAnalytics = getConnectionStatusAnalytics(modelMap);
Long countTime = connectionStatusAnalytics.getTimeToBytesBackpressureMillis();
Long countTime = connectionStatusAnalytics.getTimeToBytesBackpressureMillis(connection, flowFileEvent);
assertNotNull(countTime);
assert (countTime > 0);
}
@ -241,7 +237,7 @@ public class TestConnectionStatusAnalytics {
public void testCannotPredictTimeToBytesNaN() {
Map<String, Tuple<StatusAnalyticsModel, StatusMetricExtractFunction>> modelMap = getModelMap("queuedBytes",.9,Double.NaN,Double.NaN);
ConnectionStatusAnalytics connectionStatusAnalytics = getConnectionStatusAnalytics(modelMap);
Long countTime = connectionStatusAnalytics.getTimeToBytesBackpressureMillis();
Long countTime = connectionStatusAnalytics.getTimeToBytesBackpressureMillis(connection, flowFileEvent);
assertNotNull(countTime);
assert (countTime == -1);
}
@ -250,7 +246,7 @@ public class TestConnectionStatusAnalytics {
public void testCannotPredictTimeToBytesInfinite() {
Map<String, Tuple<StatusAnalyticsModel, StatusMetricExtractFunction>> modelMap = getModelMap("queuedBytes",.9,Double.POSITIVE_INFINITY,Double.POSITIVE_INFINITY);
ConnectionStatusAnalytics connectionStatusAnalytics = getConnectionStatusAnalytics(modelMap);
Long countTime = connectionStatusAnalytics.getTimeToBytesBackpressureMillis();
Long countTime = connectionStatusAnalytics.getTimeToBytesBackpressureMillis(connection, flowFileEvent);
assertNotNull(countTime);
assert (countTime == -1);
}
@ -259,7 +255,7 @@ public class TestConnectionStatusAnalytics {
public void testCannotPredictTimeToBytesNegative() {
Map<String, Tuple<StatusAnalyticsModel, StatusMetricExtractFunction>> modelMap = getModelMap("queuedBytes",.9,-1.0,-1.0);
ConnectionStatusAnalytics connectionStatusAnalytics = getConnectionStatusAnalytics(modelMap);
Long countTime = connectionStatusAnalytics.getTimeToBytesBackpressureMillis();
Long countTime = connectionStatusAnalytics.getTimeToBytesBackpressureMillis(connection, flowFileEvent);
assertNotNull(countTime);
assert (countTime == -1);
}
@ -269,7 +265,7 @@ public class TestConnectionStatusAnalytics {
Map<String, Tuple<StatusAnalyticsModel, StatusMetricExtractFunction>> modelMap = getModelMap("fakeModel",Double.POSITIVE_INFINITY,Double.POSITIVE_INFINITY,Double.POSITIVE_INFINITY);
ConnectionStatusAnalytics connectionStatusAnalytics = getConnectionStatusAnalytics(modelMap);
try {
connectionStatusAnalytics.getTimeToBytesBackpressureMillis();
connectionStatusAnalytics.getTimeToBytesBackpressureMillis(connection, flowFileEvent);
fail();
}catch(IllegalArgumentException iae){
assertTrue(true);
@ -280,7 +276,7 @@ public class TestConnectionStatusAnalytics {
public void testGetNextIntervalBytes() {
Map<String, Tuple<StatusAnalyticsModel, StatusMetricExtractFunction>> modelMap = getModelMap("queuedBytes",.9,1.0,1.0);
ConnectionStatusAnalytics connectionStatusAnalytics = getConnectionStatusAnalytics(modelMap);
Long nextIntervalBytes = connectionStatusAnalytics.getNextIntervalBytes();
Long nextIntervalBytes = connectionStatusAnalytics.getNextIntervalBytes(flowFileEvent);
assertNotNull(nextIntervalBytes);
assert (nextIntervalBytes > 0);
}
@ -289,7 +285,7 @@ public class TestConnectionStatusAnalytics {
public void testNextIntervalBytesZero() {
Map<String, Tuple<StatusAnalyticsModel, StatusMetricExtractFunction>> modelMap = getModelMap("queuedBytes",.9,-1.0,-1.0);
ConnectionStatusAnalytics connectionStatusAnalytics = getConnectionStatusAnalytics(modelMap);
Long nextIntervalBytes = connectionStatusAnalytics.getNextIntervalBytes();
Long nextIntervalBytes = connectionStatusAnalytics.getNextIntervalBytes(flowFileEvent);
assertNotNull(nextIntervalBytes);
assert (nextIntervalBytes == 0);
}
@ -298,7 +294,7 @@ public class TestConnectionStatusAnalytics {
public void testCannotPredictNextIntervalBytesNaN() {
Map<String, Tuple<StatusAnalyticsModel, StatusMetricExtractFunction>> modelMap = getModelMap("queuedBytes",.9,Double.NaN,Double.NaN);
ConnectionStatusAnalytics connectionStatusAnalytics = getConnectionStatusAnalytics(modelMap);
Long nextIntervalBytes = connectionStatusAnalytics.getNextIntervalBytes();
Long nextIntervalBytes = connectionStatusAnalytics.getNextIntervalBytes(flowFileEvent);
assertNotNull(nextIntervalBytes);
assert (nextIntervalBytes == -1);
}
@ -307,7 +303,7 @@ public class TestConnectionStatusAnalytics {
public void testCannotPredictNextIntervalBytesInfinity() {
Map<String, Tuple<StatusAnalyticsModel, StatusMetricExtractFunction>> modelMap = getModelMap("queuedBytes",.9,Double.POSITIVE_INFINITY,Double.POSITIVE_INFINITY);
ConnectionStatusAnalytics connectionStatusAnalytics = getConnectionStatusAnalytics(modelMap);
Long nextIntervalBytes = connectionStatusAnalytics.getNextIntervalBytes();
Long nextIntervalBytes = connectionStatusAnalytics.getNextIntervalBytes(flowFileEvent);
assertNotNull(nextIntervalBytes);
assert (nextIntervalBytes == -1);
}
@ -317,7 +313,7 @@ public class TestConnectionStatusAnalytics {
Map<String, Tuple<StatusAnalyticsModel, StatusMetricExtractFunction>> modelMap = getModelMap("fakeModel",Double.POSITIVE_INFINITY,Double.POSITIVE_INFINITY,Double.POSITIVE_INFINITY);
ConnectionStatusAnalytics connectionStatusAnalytics = getConnectionStatusAnalytics(modelMap);
try {
connectionStatusAnalytics.getNextIntervalBytes();
connectionStatusAnalytics.getNextIntervalBytes(flowFileEvent);
fail();
}catch(IllegalArgumentException iae){
assertTrue(true);
@ -328,7 +324,7 @@ public class TestConnectionStatusAnalytics {
public void testGetNextIntervalCount() {
Map<String, Tuple<StatusAnalyticsModel, StatusMetricExtractFunction>> modelMap = getModelMap("queuedCount",.9,1.0,1.0);
ConnectionStatusAnalytics connectionStatusAnalytics = getConnectionStatusAnalytics(modelMap);
Long nextIntervalBytes = connectionStatusAnalytics.getNextIntervalCount();
Long nextIntervalBytes = connectionStatusAnalytics.getNextIntervalCount(flowFileEvent);
assertNotNull(nextIntervalBytes);
assert (nextIntervalBytes > 0);
}
@ -337,7 +333,7 @@ public class TestConnectionStatusAnalytics {
public void testGetNextIntervalCountZero() {
Map<String, Tuple<StatusAnalyticsModel, StatusMetricExtractFunction>> modelMap = getModelMap("queuedCount",.9,-1.0,-1.0);
ConnectionStatusAnalytics connectionStatusAnalytics = getConnectionStatusAnalytics(modelMap);
Long nextIntervalCount = connectionStatusAnalytics.getNextIntervalCount();
Long nextIntervalCount = connectionStatusAnalytics.getNextIntervalCount(flowFileEvent);
assertNotNull(nextIntervalCount);
assert (nextIntervalCount == 0);
}
@ -346,7 +342,7 @@ public class TestConnectionStatusAnalytics {
public void testCannotPredictNextIntervalCountNaN() {
Map<String, Tuple<StatusAnalyticsModel, StatusMetricExtractFunction>> modelMap = getModelMap("queuedCount",.9,Double.NaN,Double.NaN);
ConnectionStatusAnalytics connectionStatusAnalytics = getConnectionStatusAnalytics(modelMap);
Long nextIntervalCount = connectionStatusAnalytics.getNextIntervalCount();
Long nextIntervalCount = connectionStatusAnalytics.getNextIntervalCount(flowFileEvent);
assertNotNull(nextIntervalCount);
assert (nextIntervalCount == -1);
}
@ -355,7 +351,7 @@ public class TestConnectionStatusAnalytics {
public void testCannotPredictNextIntervalCountInfinity() {
Map<String, Tuple<StatusAnalyticsModel, StatusMetricExtractFunction>> modelMap = getModelMap("queuedCount",.9,Double.POSITIVE_INFINITY,Double.POSITIVE_INFINITY);
ConnectionStatusAnalytics connectionStatusAnalytics = getConnectionStatusAnalytics(modelMap);
Long nextIntervalCount = connectionStatusAnalytics.getNextIntervalCount();
Long nextIntervalCount = connectionStatusAnalytics.getNextIntervalCount(flowFileEvent);
assertNotNull(nextIntervalCount);
assert (nextIntervalCount == -1);
}
@ -365,7 +361,7 @@ public class TestConnectionStatusAnalytics {
Map<String, Tuple<StatusAnalyticsModel, StatusMetricExtractFunction>> modelMap = getModelMap("fakeModel",Double.POSITIVE_INFINITY,Double.POSITIVE_INFINITY,Double.POSITIVE_INFINITY);
ConnectionStatusAnalytics connectionStatusAnalytics = getConnectionStatusAnalytics(modelMap);
try {
connectionStatusAnalytics.getNextIntervalCount();
connectionStatusAnalytics.getNextIntervalCount(flowFileEvent);
fail();
}catch(IllegalArgumentException iae){
assertTrue(true);
@ -376,7 +372,7 @@ public class TestConnectionStatusAnalytics {
public void testGetNextIntervalPercentageUseCount() {
Map<String, Tuple<StatusAnalyticsModel, StatusMetricExtractFunction>> modelMap = getModelMap("queuedCount",.9,50.0,1.0);
ConnectionStatusAnalytics connectionStatusAnalytics = getConnectionStatusAnalytics(modelMap);
Long percentage = connectionStatusAnalytics.getNextIntervalPercentageUseCount();
Long percentage = connectionStatusAnalytics.getNextIntervalPercentageUseCount(connection, flowFileEvent);
assertNotNull(percentage);
assert (percentage == 50);
}
@ -385,7 +381,7 @@ public class TestConnectionStatusAnalytics {
public void testGetNextIntervalPercentageUseBytes() {
Map<String, Tuple<StatusAnalyticsModel, StatusMetricExtractFunction>> modelMap = getModelMap("queuedBytes",.9,10000000.0,1.0);
ConnectionStatusAnalytics connectionStatusAnalytics = getConnectionStatusAnalytics(modelMap);
Long percentage = connectionStatusAnalytics.getNextIntervalPercentageUseBytes();
Long percentage = connectionStatusAnalytics.getNextIntervalPercentageUseBytes(connection, flowFileEvent);
assertNotNull(percentage);
assert (percentage == 10);
}
@ -398,6 +394,7 @@ public class TestConnectionStatusAnalytics {
Map<String, Tuple<StatusAnalyticsModel, StatusMetricExtractFunction>> countModelMap = getModelMap("queuedCount",.9,50.0,tomorrowMillis.doubleValue());
countModelMap.putAll(bytesModelMap);
ConnectionStatusAnalytics connectionStatusAnalytics = getConnectionStatusAnalytics(countModelMap);
connectionStatusAnalytics.loadPredictions(repositoryStatusReport);
Map<String,Long> scores = connectionStatusAnalytics.getPredictions();
assertNotNull(scores);
assertFalse(scores.isEmpty());
@ -418,6 +415,7 @@ public class TestConnectionStatusAnalytics {
Map<String, Tuple<StatusAnalyticsModel, StatusMetricExtractFunction>> countModelMap = getModelMap("queuedCount",.1,50.0,tomorrowMillis.doubleValue());
countModelMap.putAll(bytesModelMap);
ConnectionStatusAnalytics connectionStatusAnalytics = getConnectionStatusAnalytics(countModelMap);
connectionStatusAnalytics.loadPredictions(repositoryStatusReport);
Map<String,Long> scores = connectionStatusAnalytics.getPredictions();
assertNotNull(scores);
assertFalse(scores.isEmpty());

View File

@ -17,16 +17,15 @@
package org.apache.nifi.controller.status.analytics;
import org.apache.nifi.controller.flow.FlowManager;
import org.apache.nifi.controller.repository.FlowFileEventRepository;
import org.apache.nifi.controller.status.history.ComponentStatusRepository;
public class TestConnectionStatusAnalyticsEngine extends TestStatusAnalyticsEngine {
@Override
public StatusAnalyticsEngine getStatusAnalyticsEngine(FlowManager flowManager, FlowFileEventRepository flowFileEventRepository,
ComponentStatusRepository statusRepository, StatusAnalyticsModelMapFactory statusAnalyticsModelMapFactory,
long predictIntervalMillis, long queryIntervalMillis, String scoreName, double scoreThreshold) {
return new ConnectionStatusAnalyticsEngine(flowManager, statusRepository, flowFileEventRepository,statusAnalyticsModelMapFactory,
public StatusAnalyticsEngine getStatusAnalyticsEngine(FlowManager flowManager, ComponentStatusRepository statusRepository,
StatusAnalyticsModelMapFactory statusAnalyticsModelMapFactory,
long predictIntervalMillis, long queryIntervalMillis, String scoreName, double scoreThreshold) {
return new ConnectionStatusAnalyticsEngine(flowManager, statusRepository, statusAnalyticsModelMapFactory,
DEFAULT_PREDICT_INTERVAL_MILLIS, DEFAULT_QUERY_INTERVAL_MILLIS, scoreName, scoreThreshold);
}

View File

@ -92,13 +92,13 @@ public abstract class TestStatusAnalyticsEngine {
@Test
public void testGetStatusAnalytics() {
StatusAnalyticsEngine statusAnalyticsEngine = getStatusAnalyticsEngine(flowManager,flowFileEventRepository, statusRepository, statusAnalyticsModelMapFactory, DEFAULT_PREDICT_INTERVAL_MILLIS,
StatusAnalyticsEngine statusAnalyticsEngine = getStatusAnalyticsEngine(flowManager, statusRepository, statusAnalyticsModelMapFactory, DEFAULT_PREDICT_INTERVAL_MILLIS,
DEFAULT_QUERY_INTERVAL_MILLIS, DEFAULT_SCORE_NAME, DEFAULT_SCORE_THRESHOLD);
StatusAnalytics statusAnalytics = statusAnalyticsEngine.getStatusAnalytics("1");
assertNotNull(statusAnalytics);
}
public abstract StatusAnalyticsEngine getStatusAnalyticsEngine(FlowManager flowManager, FlowFileEventRepository flowFileEventRepository,
public abstract StatusAnalyticsEngine getStatusAnalyticsEngine(FlowManager flowManager,
ComponentStatusRepository componentStatusRepository, StatusAnalyticsModelMapFactory statusAnalyticsModelMapFactory,
long predictIntervalMillis, long queryIntervalMillis, String scoreName, double scoreThreshold);