NIFI-13465: Fix NPE in QueryNiFiReportingTask when analytics tables are queried without analytics enabled

Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com>

This closes #9016.
This commit is contained in:
Matt Burgess 2024-06-28 11:53:56 -04:00 committed by Pierre Villard
parent 229364fc31
commit 239b9362f4
No known key found for this signature in database
GPG Key ID: F92A93B30C07C6D5
3 changed files with 60 additions and 6 deletions

View File

@ -148,12 +148,11 @@ public class MetricsSqlQueryService implements MetricsQueryService {
final NiFiTable connectionStatusTable = new NiFiTable("CONNECTION_STATUS", connectionStatusDataSource, getLogger()); final NiFiTable connectionStatusTable = new NiFiTable("CONNECTION_STATUS", connectionStatusDataSource, getLogger());
database.addTable(connectionStatusTable); database.addTable(connectionStatusTable);
if (context.isAnalyticsEnabled()) { final ResettableDataSource predictionDataSource = new ConnectionStatusPredictionDataSource(context, groupStatusCache);
final ResettableDataSource predictionDataSource = new ConnectionStatusPredictionDataSource(context, groupStatusCache); final NiFiTable connectionStatusPredictionsTable = new NiFiTable("CONNECTION_STATUS_PREDICTIONS", predictionDataSource, getLogger());
final NiFiTable connectionStatusPredictionsTable = new NiFiTable("CONNECTION_STATUS_PREDICTIONS", predictionDataSource, getLogger()); database.addTable(connectionStatusPredictionsTable);
database.addTable(connectionStatusPredictionsTable); if (!context.isAnalyticsEnabled()) {
} else { getLogger().info("Analytics is not enabled, CONNECTION_STATUS_PREDICTIONS table will not contain any rows");
getLogger().debug("Analytics is not enabled, CONNECTION_STATUS_PREDICTIONS table is not available for querying");
} }
final ResettableDataSource processorStatusDataSource = new ProcessorStatusDataSource(context, groupStatusCache); final ResettableDataSource processorStatusDataSource = new ProcessorStatusDataSource(context, groupStatusCache);

View File

@ -88,6 +88,10 @@ public class ConnectionStatusPredictionDataSource implements ResettableDataSourc
private Object[] toArray(final ConnectionStatus status) { private Object[] toArray(final ConnectionStatus status) {
final ConnectionStatusPredictions predictions = status.getPredictions(); final ConnectionStatusPredictions predictions = status.getPredictions();
if (predictions == null) {
return new Object[8];
}
return new Object[] { return new Object[] {
status.getId(), status.getId(),
predictions.getNextPredictedQueuedBytes(), predictions.getNextPredictedQueuedBytes(),

View File

@ -30,6 +30,7 @@ import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.controller.status.ConnectionStatus; import org.apache.nifi.controller.status.ConnectionStatus;
import org.apache.nifi.controller.status.ProcessGroupStatus; import org.apache.nifi.controller.status.ProcessGroupStatus;
import org.apache.nifi.controller.status.ProcessorStatus; import org.apache.nifi.controller.status.ProcessorStatus;
import org.apache.nifi.controller.status.analytics.ConnectionStatusPredictions;
import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.Processor; import org.apache.nifi.processor.Processor;
import org.apache.nifi.provenance.MockProvenanceEvent; import org.apache.nifi.provenance.MockProvenanceEvent;
@ -128,11 +129,29 @@ class TestQueryNiFiReportingTask {
root1ConnectionStatus.setId("root1"); root1ConnectionStatus.setId("root1");
root1ConnectionStatus.setQueuedCount(1000); root1ConnectionStatus.setQueuedCount(1000);
root1ConnectionStatus.setBackPressureObjectThreshold(1000); root1ConnectionStatus.setBackPressureObjectThreshold(1000);
// Set backpressure predictions
ConnectionStatusPredictions connectionStatusPredictions1 = new ConnectionStatusPredictions();
connectionStatusPredictions1.setPredictedTimeToCountBackpressureMillis(2000);
connectionStatusPredictions1.setPredictedTimeToBytesBackpressureMillis(2000);
connectionStatusPredictions1.setNextPredictedQueuedBytes(1024);
connectionStatusPredictions1.setNextPredictedQueuedCount(1);
connectionStatusPredictions1.setPredictedPercentBytes(55);
connectionStatusPredictions1.setPredictedPercentCount(30);
root1ConnectionStatus.setPredictions(connectionStatusPredictions1);
ConnectionStatus root2ConnectionStatus = new ConnectionStatus(); ConnectionStatus root2ConnectionStatus = new ConnectionStatus();
root2ConnectionStatus.setId("root2"); root2ConnectionStatus.setId("root2");
root2ConnectionStatus.setQueuedCount(500); root2ConnectionStatus.setQueuedCount(500);
root2ConnectionStatus.setBackPressureObjectThreshold(1000); root2ConnectionStatus.setBackPressureObjectThreshold(1000);
// Set backpressure predictions
ConnectionStatusPredictions connectionStatusPredictions2 = new ConnectionStatusPredictions();
connectionStatusPredictions2.setPredictedTimeToBytesBackpressureMillis(2000);
connectionStatusPredictions2.setPredictedTimeToBytesBackpressureMillis(2000);
connectionStatusPredictions2.setNextPredictedQueuedBytes(1024);
connectionStatusPredictions2.setNextPredictedQueuedCount(1);
connectionStatusPredictions2.setPredictedPercentBytes(55);
connectionStatusPredictions2.setPredictedPercentCount(30);
root2ConnectionStatus.setPredictions(connectionStatusPredictions2);
Collection<ConnectionStatus> rootConnectionStatuses = new ArrayList<>(); Collection<ConnectionStatus> rootConnectionStatuses = new ArrayList<>();
rootConnectionStatuses.add(root1ConnectionStatus); rootConnectionStatuses.add(root1ConnectionStatus);
@ -151,6 +170,15 @@ class TestQueryNiFiReportingTask {
ConnectionStatus nestedConnectionStatus = new ConnectionStatus(); ConnectionStatus nestedConnectionStatus = new ConnectionStatus();
nestedConnectionStatus.setId("nested"); nestedConnectionStatus.setId("nested");
nestedConnectionStatus.setQueuedCount(1001); nestedConnectionStatus.setQueuedCount(1001);
// Set backpressure predictions
ConnectionStatusPredictions connectionStatusPredictions3 = new ConnectionStatusPredictions();
connectionStatusPredictions3.setPredictedTimeToBytesBackpressureMillis(2000);
connectionStatusPredictions3.setPredictedTimeToBytesBackpressureMillis(2000);
connectionStatusPredictions3.setNextPredictedQueuedBytes(1024);
connectionStatusPredictions3.setNextPredictedQueuedCount(1);
connectionStatusPredictions3.setPredictedPercentBytes(55);
connectionStatusPredictions3.setPredictedPercentCount(30);
nestedConnectionStatus.setPredictions(connectionStatusPredictions3);
Collection<ConnectionStatus> nestedConnectionStatuses = new ArrayList<>(); Collection<ConnectionStatus> nestedConnectionStatuses = new ArrayList<>();
nestedConnectionStatuses.add(nestedConnectionStatus); nestedConnectionStatuses.add(nestedConnectionStatus);
groupStatus2.setConnectionStatus(nestedConnectionStatuses); groupStatus2.setConnectionStatus(nestedConnectionStatuses);
@ -163,6 +191,15 @@ class TestQueryNiFiReportingTask {
ConnectionStatus nestedConnectionStatus2 = new ConnectionStatus(); ConnectionStatus nestedConnectionStatus2 = new ConnectionStatus();
nestedConnectionStatus2.setId("nested2"); nestedConnectionStatus2.setId("nested2");
nestedConnectionStatus2.setQueuedCount(3); nestedConnectionStatus2.setQueuedCount(3);
// Set backpressure predictions
ConnectionStatusPredictions connectionStatusPredictions4 = new ConnectionStatusPredictions();
connectionStatusPredictions4.setPredictedTimeToBytesBackpressureMillis(2000);
connectionStatusPredictions4.setPredictedTimeToBytesBackpressureMillis(2000);
connectionStatusPredictions4.setNextPredictedQueuedBytes(1024);
connectionStatusPredictions4.setNextPredictedQueuedCount(1);
connectionStatusPredictions4.setPredictedPercentBytes(55);
connectionStatusPredictions4.setPredictedPercentCount(30);
nestedConnectionStatus2.setPredictions(connectionStatusPredictions4);
Collection<ConnectionStatus> nestedConnectionStatuses2 = new ArrayList<>(); Collection<ConnectionStatus> nestedConnectionStatuses2 = new ArrayList<>();
nestedConnectionStatuses2.add(nestedConnectionStatus2); nestedConnectionStatuses2.add(nestedConnectionStatus2);
groupStatus3.setConnectionStatus(nestedConnectionStatuses2); groupStatus3.setConnectionStatus(nestedConnectionStatuses2);
@ -226,6 +263,20 @@ class TestQueryNiFiReportingTask {
assertEquals(3, row.get("queuedCount")); assertEquals(3, row.get("queuedCount"));
} }
@Test
void testConnectionStatusTableJoin() throws InitializationException {
final Map<PropertyDescriptor, String> properties = new HashMap<>();
properties.put(QueryMetricsUtil.RECORD_SINK, "mock-record-sink");
properties.put(QueryMetricsUtil.QUERY, "SELECT id "
+ "FROM CONNECTION_STATUS "
+ "JOIN CONNECTION_STATUS_PREDICTIONS ON CONNECTION_STATUS_PREDICTIONS.connectionId = CONNECTION_STATUS.id");
reportingTask = initTask(properties);
reportingTask.onTrigger(context);
List<Map<String, Object>> rows = mockRecordSinkService.getRows();
assertEquals(4, rows.size());
}
@Test @Test
void testBulletinIsInTimeWindow() throws InitializationException { void testBulletinIsInTimeWindow() throws InitializationException {
String query = "select * from BULLETINS where bulletinTimestamp > $bulletinStartTime and bulletinTimestamp <= $bulletinEndTime"; String query = "select * from BULLETINS where bulletinTimestamp > $bulletinStartTime and bulletinTimestamp <= $bulletinEndTime";