NIFI-9327: Added timewindow query to QueryNiFiReportingTask and MetricsEventReportingTask

This commit is contained in:
Lehel 2021-10-27 11:12:13 +02:00 committed by Tamas Palfy
parent 61b867cda2
commit 72e6accc12
11 changed files with 649 additions and 156 deletions

View File

@ -16,10 +16,12 @@
*/
package org.apache.nifi.reporting.sql;
import org.apache.nifi.annotation.behavior.Stateful;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.reporting.AbstractReportingTask;
import org.apache.nifi.reporting.ReportingContext;
@ -31,19 +33,23 @@ import org.apache.nifi.rules.engine.RulesEngineService;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.ResultSetRecordSet;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.apache.nifi.reporting.sql.util.TrackedQueryTime.BULLETIN_END_TIME;
import static org.apache.nifi.reporting.sql.util.TrackedQueryTime.BULLETIN_START_TIME;
import static org.apache.nifi.reporting.sql.util.TrackedQueryTime.PROVENANCE_END_TIME;
import static org.apache.nifi.reporting.sql.util.TrackedQueryTime.PROVENANCE_START_TIME;
import static org.apache.nifi.util.db.JdbcProperties.VARIABLE_REGISTRY_ONLY_DEFAULT_PRECISION;
import static org.apache.nifi.util.db.JdbcProperties.VARIABLE_REGISTRY_ONLY_DEFAULT_SCALE;
@Tags({"reporting", "rules", "action", "action handler", "status", "connection", "processor", "jvm", "metrics", "history", "bulletin", "sql"})
@CapabilityDescription("Triggers rules-driven actions based on metrics values ")
public class MetricsEventReportingTask extends AbstractReportingTask {
@Stateful(scopes = Scope.LOCAL, description = "Stores the Reporting Task's last execution time so that on restart the task knows where it left off.")
public class MetricsEventReportingTask extends AbstractReportingTask implements QueryTimeAware {
private List<PropertyDescriptor> properties;
private MetricsQueryService metricsQueryService;
@ -67,7 +73,7 @@ public class MetricsEventReportingTask extends AbstractReportingTask {
}
@OnScheduled
public void setup(final ConfigurationContext context) throws IOException {
public void setup(final ConfigurationContext context) {
actionHandler = context.getProperty(QueryMetricsUtil.ACTION_HANDLER).asControllerService(PropertyContextActionHandler.class);
rulesEngineService = context.getProperty(QueryMetricsUtil.RULES_ENGINE).asControllerService(RulesEngineService.class);
final Integer defaultPrecision = context.getProperty(VARIABLE_REGISTRY_ONLY_DEFAULT_PRECISION).evaluateAttributeExpressions().asInteger();
@ -77,17 +83,20 @@ public class MetricsEventReportingTask extends AbstractReportingTask {
@Override
public void onTrigger(ReportingContext context) {
String sql = context.getProperty(QueryMetricsUtil.QUERY).evaluateAttributeExpressions().getValue();
try {
final String query = context.getProperty(QueryMetricsUtil.QUERY).evaluateAttributeExpressions().getValue();
fireRules(context, actionHandler, rulesEngineService, query);
sql = processStartAndEndTimes(context, sql, BULLETIN_START_TIME, BULLETIN_END_TIME);
sql = processStartAndEndTimes(context, sql, PROVENANCE_START_TIME, PROVENANCE_END_TIME);
fireRules(context, actionHandler, rulesEngineService, sql);
} catch (Exception e) {
getLogger().error("Error opening loading rules: {}", new Object[]{e.getMessage()}, e);
}
}
private void fireRules(ReportingContext context, PropertyContextActionHandler actionHandler, RulesEngineService engine, String query) throws Exception {
getLogger().debug("Executing query: {}", query);
QueryResult queryResult = metricsQueryService.query(context, query);
getLogger().debug("Executing query: {}", new Object[]{ query });
ResultSetRecordSet recordSet = metricsQueryService.getResultSetRecordSet(queryResult);
Record record;
try {
@ -97,16 +106,14 @@ public class MetricsEventReportingTask extends AbstractReportingTask {
facts.put(fieldName, record.getValue(fieldName));
}
List<Action> actions = engine.fireRules(facts);
if(actions == null || actions.isEmpty()){
if (actions == null || actions.isEmpty()) {
getLogger().debug("No actions required for provided facts.");
} else {
actions.forEach(action -> {
actionHandler.execute(context, action,facts);
});
actions.forEach(action -> actionHandler.execute(context, action, facts));
}
}
} finally {
metricsQueryService.closeQuietly(recordSet);
}
}
}
}

View File

@ -16,10 +16,12 @@
*/
package org.apache.nifi.reporting.sql;
import org.apache.nifi.annotation.behavior.Stateful;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.record.sink.RecordSinkService;
import org.apache.nifi.reporting.AbstractReportingTask;
@ -29,7 +31,6 @@ import org.apache.nifi.reporting.sql.util.QueryMetricsUtil;
import org.apache.nifi.serialization.record.ResultSetRecordSet;
import org.apache.nifi.util.StopWatch;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@ -38,6 +39,10 @@ import java.util.Map;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import static org.apache.nifi.reporting.sql.util.TrackedQueryTime.BULLETIN_END_TIME;
import static org.apache.nifi.reporting.sql.util.TrackedQueryTime.BULLETIN_START_TIME;
import static org.apache.nifi.reporting.sql.util.TrackedQueryTime.PROVENANCE_END_TIME;
import static org.apache.nifi.reporting.sql.util.TrackedQueryTime.PROVENANCE_START_TIME;
import static org.apache.nifi.util.db.JdbcProperties.VARIABLE_REGISTRY_ONLY_DEFAULT_PRECISION;
import static org.apache.nifi.util.db.JdbcProperties.VARIABLE_REGISTRY_ONLY_DEFAULT_SCALE;
@ -46,7 +51,8 @@ import static org.apache.nifi.util.db.JdbcProperties.VARIABLE_REGISTRY_ONLY_DEFA
+ "BULLETINS, PROCESS_GROUP_STATUS, JVM_METRICS, CONNECTION_STATUS_PREDICTIONS, or PROVENANCE tables, and can use any functions or capabilities provided by Apache Calcite. Note that the "
+ "CONNECTION_STATUS_PREDICTIONS table is not available for querying if analytics are not enabled (see the nifi.analytics.predict.enabled property in nifi.properties). Attempting a "
+ "query on the table when the capability is disabled will cause an error.")
public class QueryNiFiReportingTask extends AbstractReportingTask {
@Stateful(scopes = Scope.LOCAL, description = "Stores the Reporting Task's last execution time so that on restart the task knows where it left off.")
public class QueryNiFiReportingTask extends AbstractReportingTask implements QueryTimeAware {
private List<PropertyDescriptor> properties;
@ -71,7 +77,7 @@ public class QueryNiFiReportingTask extends AbstractReportingTask {
}
@OnScheduled
public void setup(final ConfigurationContext context) throws IOException {
public void setup(final ConfigurationContext context) {
recordSinkService = context.getProperty(QueryMetricsUtil.RECORD_SINK).asControllerService(RecordSinkService.class);
recordSinkService.reset();
final Integer defaultPrecision = context.getProperty(VARIABLE_REGISTRY_ONLY_DEFAULT_PRECISION).evaluateAttributeExpressions().asInteger();
@ -82,13 +88,16 @@ public class QueryNiFiReportingTask extends AbstractReportingTask {
@Override
public void onTrigger(ReportingContext context) {
final StopWatch stopWatch = new StopWatch(true);
String sql = context.getProperty(QueryMetricsUtil.QUERY).getValue();
try {
final String sql = context.getProperty(QueryMetricsUtil.QUERY).evaluateAttributeExpressions().getValue();
sql = processStartAndEndTimes(context, sql, BULLETIN_START_TIME, BULLETIN_END_TIME);
sql = processStartAndEndTimes(context, sql, PROVENANCE_START_TIME, PROVENANCE_END_TIME);
getLogger().debug("Executing query: {}", sql);
final QueryResult queryResult = metricsQueryService.query(context, sql);
final ResultSetRecordSet recordSet;
try {
getLogger().debug("Executing query: {}", new Object[]{sql});
recordSet = metricsQueryService.getResultSetRecordSet(queryResult);
} catch (final Exception e) {
getLogger().error("Error creating record set from query results due to {}", new Object[]{e.getMessage()}, e);
@ -110,10 +119,10 @@ public class QueryNiFiReportingTask extends AbstractReportingTask {
metricsQueryService.closeQuietly(queryResult);
}
final long elapsedMillis = stopWatch.getElapsed(TimeUnit.MILLISECONDS);
getLogger().debug("Successfully queried and sent in {} millis", new Object[]{elapsedMillis});
getLogger().debug("Successfully queried and sent in {} millis", elapsedMillis);
} catch (Exception e) {
getLogger().error("Error processing the query due to {}", new Object[]{e.getMessage()}, e);
}
}
}

View File

@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.reporting.sql;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.components.state.StateManager;
import org.apache.nifi.reporting.ReportingContext;
import org.apache.nifi.reporting.sql.util.TrackedQueryTime;
import java.io.IOException;
import java.time.Instant;
import java.util.HashMap;
import java.util.Map;
public interface QueryTimeAware {
default String processStartAndEndTimes(ReportingContext context, String sql, TrackedQueryTime queryStartTime, TrackedQueryTime queryEndTime) throws IOException {
StateManager stateManager = context.getStateManager();
final Map<String, String> stateMap = new HashMap<>(stateManager.getState(Scope.LOCAL).toMap());
if (sql.contains(queryStartTime.getSqlPlaceholder()) && sql.contains(queryEndTime.getSqlPlaceholder())) {
final long startTime = stateMap.get(queryStartTime.name()) == null ? 0 : Long.parseLong(stateMap.get(queryStartTime.name()));
final long currentTime = getCurrentTime();
sql = sql.replace(queryStartTime.getSqlPlaceholder(), String.valueOf(startTime));
sql = sql.replace(queryEndTime.getSqlPlaceholder(), String.valueOf(currentTime));
stateMap.put(queryStartTime.name(), String.valueOf(currentTime));
stateManager.setState(stateMap, Scope.LOCAL);
}
return sql;
}
default long getCurrentTime() {
return Instant.now().toEpochMilli();
}
}

View File

@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.reporting.sql.util;
public enum TrackedQueryTime {
BULLETIN_START_TIME("$bulletinStartTime"),
BULLETIN_END_TIME("$bulletinEndTime"),
PROVENANCE_START_TIME("$provenanceStartTime"),
PROVENANCE_END_TIME("$provenanceEndTime");
private final String sqlPlaceholder;
TrackedQueryTime(final String sqlPlaceholder) {
this.sqlPlaceholder = sqlPlaceholder;
}
public String getSqlPlaceholder() {
return sqlPlaceholder;
}
}

View File

@ -29,6 +29,35 @@
A distinct ActionHandler can be used to service all events or an ActionHandlerLookup can be used for dynamic handler lookup. NOTE: Optimally action handler should be associated with the expected action types
returned from the rules engine.
</p>
<p>
The reporting task can uniquely handle items from the bulletin and provenance repositories. This means that an item will only be processed once when the query is set to unique.
The query can be set to unique by defining a time window with special sql placeholders ($bulletinStartTime, $bulletinEndTime, $provenanceStartTime, $provenanceEndTime)
that the reporting task will evaluate runtime. See the SQL Query Examples section.
</p>
<br/><br/>
<h2>SQL Query Examples</h2>
<p>
<b>Example:</b> Select all fields from the <code>CONNECTION_STATUS</code> table:<br/>
<pre>SELECT * FROM CONNECTION_STATUS</pre>
</p>
<br/>
<p>
<b>Example:</b> Select connection IDs where time-to-backpressure (based on queue count) is less than 5 minutes:<br/>
<pre>SELECT connectionId FROM CONNECTION_STATUS_PREDICTIONS WHERE predictedTimeToCountBackpressureMillis < 300000</pre>
</p>
<br/>
<p>
<b>Example:</b> Get the unique bulletin categories associated with errors:<br/>
<pre>SELECT DISTINCT(bulletinCategory) FROM BULLETINS WHERE bulletinLevel = "ERROR"</pre>
</p>
<p>
<b>Example:</b> Select all fields from the <code>BULLETINS</code> table with time window:<br/>
<pre>SELECT * from BULLETINS WHERE bulletinTimestamp > $bulletinStartTime AND bulletinTimestamp <= $bulletinEndTime</pre>
</p>
<p>
<b>Example:</b> Select all fields from the <code>PROVENANCE</code> table with time window:<br/>
<pre>SELECT * from PROVENANCE where timestampMillis > $provenanceStartTime and timestampMillis <= $provenanceEndTime</pre>
</p>
<br/>
</body>
</html>

View File

@ -35,6 +35,11 @@
SiteToSiteReportingRecordSink (for sending via the Site-to-Site protocol) or DatabaseRecordSink (for sending the
query result rows to an relational database).
</p>
<p>
The reporting task can uniquely handle items from the bulletin and provenance repositories. This means that an item will only be processed once when the query is set to unique.
The query can be set to unique by defining a time window with special sql placeholders ($bulletinStartTime, $bulletinEndTime, $provenanceStartTime, $provenanceEndTime)
that the reporting task will evaluate runtime. See the SQL Query Examples section.
</p>
<br/>
<h2>Table Definitions</h2>
<p>
@ -221,6 +226,14 @@
<b>Example:</b> Get the unique bulletin categories associated with errors:<br/>
<pre>SELECT DISTINCT(bulletinCategory) FROM BULLETINS WHERE bulletinLevel = "ERROR"</pre>
</p>
<p>
<b>Example:</b> Select all fields from the <code>BULLETINS</code> table with time window:<br/>
<pre>SELECT * from BULLETINS WHERE bulletinTimestamp > $bulletinStartTime AND bulletinTimestamp <= $bulletinEndTime</pre>
</p>
<p>
<b>Example:</b> Select all fields from the <code>PROVENANCE</code> table with time window:<br/>
<pre>SELECT * from PROVENANCE where timestampMillis > $provenanceStartTime and timestampMillis <= $provenanceEndTime</pre>
</p>
<br/>
</body>
</html>

View File

@ -33,10 +33,11 @@ import java.util.Map;
public class MockRecordSinkService extends AbstractConfigurableComponent implements RecordSinkService {
private List<Map<String, Object>> rows = new ArrayList<>();;
private List<Map<String, Object>> rows = new ArrayList<>();
@Override
public WriteResult sendData(RecordSet recordSet, Map<String,String> attributes, boolean sendZeroResults) throws IOException {
rows = new ArrayList<>();
int numRecordsWritten = 0;
RecordSchema recordSchema = recordSet.getSchema();
Record record;

View File

@ -16,10 +16,10 @@
*/
package org.apache.nifi.reporting.sql;
import com.google.common.collect.Lists;
import org.apache.nifi.attribute.expression.language.StandardPropertyValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.context.PropertyContext;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.controller.status.ConnectionStatus;
@ -27,21 +27,30 @@ import org.apache.nifi.controller.status.ProcessGroupStatus;
import org.apache.nifi.controller.status.ProcessorStatus;
import org.apache.nifi.controller.status.analytics.ConnectionStatusPredictions;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.reporting.BulletinRepository;
import org.apache.nifi.processor.Processor;
import org.apache.nifi.provenance.MockProvenanceRepository;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.ProvenanceEventType;
import org.apache.nifi.reporting.Bulletin;
import org.apache.nifi.reporting.BulletinFactory;
import org.apache.nifi.reporting.BulletinQuery;
import org.apache.nifi.reporting.ComponentType;
import org.apache.nifi.reporting.EventAccess;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.reporting.ReportingContext;
import org.apache.nifi.reporting.ReportingInitializationContext;
import org.apache.nifi.reporting.Severity;
import org.apache.nifi.reporting.sql.util.QueryMetricsUtil;
import org.apache.nifi.rules.Action;
import org.apache.nifi.reporting.sql.util.TrackedQueryTime;
import org.apache.nifi.rules.MockPropertyContextActionHandler;
import org.apache.nifi.rules.PropertyContextActionHandler;
import org.apache.nifi.rules.engine.MockRulesEngineService;
import org.apache.nifi.rules.engine.RulesEngineService;
import org.apache.nifi.state.MockStateManager;
import org.apache.nifi.util.MockBulletinRepository;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.MockProcessSession;
import org.apache.nifi.util.MockPropertyValue;
import org.apache.nifi.util.Tuple;
import org.apache.nifi.util.SharedSessionState;
import org.apache.nifi.util.db.JdbcProperties;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@ -49,27 +58,34 @@ import org.mockito.Mockito;
import org.mockito.stubbing.Answer;
import java.io.IOException;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicLong;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.mock;
class TestMetricsEventReportingTask {
public class TestMetricsEventReportingTask {
private ReportingContext context;
private MockMetricsEventReportingTask reportingTask;
private MockPropertyContextActionHandler actionHandler;
private MockRulesEngineService rulesEngineService;
private ProcessGroupStatus status;
private MockQueryBulletinRepository mockBulletinRepository;
private MockProvenanceRepository mockProvenanceRepository;
private AtomicLong currentTime;
private MockStateManager mockStateManager;
@BeforeEach
public void setup() {
currentTime = new AtomicLong();
status = new ProcessGroupStatus();
actionHandler = new MockPropertyContextActionHandler();
status.setId("1234");
@ -112,63 +128,153 @@ public class TestMetricsEventReportingTask {
rootConnectionStatuses.add(root1ConnectionStatus);
rootConnectionStatuses.add(root2ConnectionStatus);
status.setConnectionStatus(rootConnectionStatuses);
// create a group status with processing time
ProcessGroupStatus groupStatus1 = new ProcessGroupStatus();
groupStatus1.setProcessorStatus(processorStatuses);
groupStatus1.setBytesRead(1234L);
// Create a nested group status with a connection
ProcessGroupStatus groupStatus2 = new ProcessGroupStatus();
groupStatus2.setProcessorStatus(processorStatuses);
groupStatus2.setBytesRead(12345L);
ConnectionStatus nestedConnectionStatus = new ConnectionStatus();
nestedConnectionStatus.setId("nested");
nestedConnectionStatus.setQueuedCount(1001);
Collection<ConnectionStatus> nestedConnectionStatuses = new ArrayList<>();
nestedConnectionStatuses.add(nestedConnectionStatus);
groupStatus2.setConnectionStatus(nestedConnectionStatuses);
Collection<ProcessGroupStatus> nestedGroupStatuses = new ArrayList<>();
nestedGroupStatuses.add(groupStatus2);
groupStatus1.setProcessGroupStatus(nestedGroupStatuses);
ProcessGroupStatus groupStatus3 = new ProcessGroupStatus();
groupStatus3.setBytesRead(1L);
ConnectionStatus nestedConnectionStatus2 = new ConnectionStatus();
nestedConnectionStatus2.setId("nested2");
nestedConnectionStatus2.setQueuedCount(3);
Collection<ConnectionStatus> nestedConnectionStatuses2 = new ArrayList<>();
nestedConnectionStatuses2.add(nestedConnectionStatus2);
groupStatus3.setConnectionStatus(nestedConnectionStatuses2);
Collection<ProcessGroupStatus> nestedGroupStatuses2 = new ArrayList<>();
nestedGroupStatuses2.add(groupStatus3);
Collection<ProcessGroupStatus> groupStatuses = new ArrayList<>();
groupStatuses.add(groupStatus1);
groupStatuses.add(groupStatus3);
status.setProcessGroupStatus(groupStatuses);
}
@Test
public void testConnectionStatusTable() throws IOException, InitializationException {
void testConnectionStatusTable() throws InitializationException {
final Map<PropertyDescriptor, String> properties = new HashMap<>();
properties.put(QueryMetricsUtil.QUERY, "select connectionId, predictedQueuedCount, predictedTimeToBytesBackpressureMillis from CONNECTION_STATUS_PREDICTIONS");
reportingTask = initTask(properties);
reportingTask.onTrigger(context);
List<Map<String,Object>> metricsList = actionHandler.getRows();
List<Tuple<String, Action>> defaultLogActions = actionHandler.getDefaultActionsByType("LOG");
List<Tuple<String, Action>> defaultAlertActions = actionHandler.getDefaultActionsByType("ALERT");
List<PropertyContext> propertyContexts = actionHandler.getPropertyContexts();
assertFalse(metricsList.isEmpty());
assertEquals(2,defaultLogActions.size());
assertEquals(2,defaultAlertActions.size());
assertEquals(4,propertyContexts.size());
assertEquals(2, actionHandler.getRows().size());
assertEquals(2, propertyContexts.size());
}
private MockMetricsEventReportingTask initTask(Map<PropertyDescriptor, String> customProperties) throws InitializationException, IOException {
@Test
void testUniqueBulletinQueryIsInTimeWindow() throws InitializationException {
final Map<PropertyDescriptor, String> properties = new HashMap<>();
properties.put(QueryMetricsUtil.QUERY, "select bulletinCategory from BULLETINS where bulletinTimestamp > $bulletinStartTime and bulletinTimestamp <= $bulletinEndTime");
reportingTask = initTask(properties);
currentTime.set(Instant.now().toEpochMilli());
reportingTask.onTrigger(context);
assertEquals(1, actionHandler.getRows().size());
actionHandler.reset();
final Bulletin bulletin = BulletinFactory.createBulletin(ComponentType.CONTROLLER_SERVICE.name().toLowerCase(), "WARN", "test bulletin 2", "testFlowFileUuid");
mockBulletinRepository.addBulletin(bulletin);
currentTime.set(bulletin.getTimestamp().getTime());
reportingTask.onTrigger(context);
assertEquals(1, actionHandler.getRows().size());
}
@Test
void testUniqueBulletinQueryIsOutOfTimeWindow() throws InitializationException {
final Map<PropertyDescriptor, String> properties = new HashMap<>();
properties.put(QueryMetricsUtil.QUERY, "select bulletinCategory from BULLETINS where bulletinTimestamp > $bulletinStartTime and bulletinTimestamp <= $bulletinEndTime");
reportingTask = initTask(properties);
currentTime.set(Instant.now().toEpochMilli());
reportingTask.onTrigger(context);
assertEquals(1, actionHandler.getRows().size());
actionHandler.reset();
final Bulletin bulletin = BulletinFactory.createBulletin(ComponentType.CONTROLLER_SERVICE.name().toLowerCase(), "WARN", "test bulletin 2", "testFlowFileUuid");
mockBulletinRepository.addBulletin(bulletin);
currentTime.set(bulletin.getTimestamp().getTime() - 1);
reportingTask.onTrigger(context);
assertEquals(0, actionHandler.getRows().size());
}
@Test
void testUniqueProvenanceQueryIsInTimeWindow() throws InitializationException {
final Map<PropertyDescriptor, String> properties = new HashMap<>();
properties.put(QueryMetricsUtil.QUERY, "select componentId from PROVENANCE where timestampMillis > $provenanceStartTime and timestampMillis <= $provenanceEndTime");
reportingTask = initTask(properties);
currentTime.set(Instant.now().toEpochMilli());
reportingTask.onTrigger(context);
assertEquals(1, actionHandler.getRows().size());
actionHandler.reset();
MockFlowFile mockFlowFile = new MockFlowFile(2L);
ProvenanceEventRecord prov2 = mockProvenanceRepository.eventBuilder()
.setEventType(ProvenanceEventType.CREATE)
.fromFlowFile(mockFlowFile)
.setComponentId("2")
.setComponentType("ReportingTask")
.setFlowFileUUID("I am FlowFile 2")
.setEventTime(Instant.now().toEpochMilli())
.setEventDuration(100)
.setTransitUri("test://")
.setSourceSystemFlowFileIdentifier("I am FlowFile 2")
.setAlternateIdentifierUri("remote://test")
.build();
mockProvenanceRepository.registerEvent(prov2);
currentTime.set(prov2.getEventTime());
reportingTask.onTrigger(context);
assertEquals(1, actionHandler.getRows().size());
}
@Test
void testUniqueProvenanceQueryIsOutOfTimeWindow() throws InitializationException {
final Map<PropertyDescriptor, String> properties = new HashMap<>();
properties.put(QueryMetricsUtil.QUERY, "select componentId from PROVENANCE where timestampMillis > $provenanceStartTime and timestampMillis <= $provenanceEndTime");
reportingTask = initTask(properties);
currentTime.set(Instant.now().toEpochMilli());
reportingTask.onTrigger(context);
assertEquals(1, actionHandler.getRows().size());
actionHandler.reset();
MockFlowFile mockFlowFile = new MockFlowFile(2L);
ProvenanceEventRecord prov2 = mockProvenanceRepository.eventBuilder()
.setEventType(ProvenanceEventType.CREATE)
.fromFlowFile(mockFlowFile)
.setComponentId("2")
.setComponentType("ReportingTask")
.setFlowFileUUID("I am FlowFile 2")
.setEventTime(Instant.now().toEpochMilli())
.setEventDuration(100)
.setTransitUri("test://")
.setSourceSystemFlowFileIdentifier("I am FlowFile 2")
.setAlternateIdentifierUri("remote://test")
.build();
mockProvenanceRepository.registerEvent(prov2);
currentTime.set(prov2.getEventTime() - 1);
reportingTask.onTrigger(context);
assertEquals(0, actionHandler.getRows().size());
}
@Test
void testTimeWindowFromStateMap() throws IOException, InitializationException {
final Map<PropertyDescriptor, String> properties = new HashMap<>();
properties.put(QueryMetricsUtil.RECORD_SINK, "mock-record-sink");
properties.put(QueryMetricsUtil.QUERY, "select * from BULLETINS, PROVENANCE where " +
"bulletinTimestamp > $bulletinStartTime and bulletinTimestamp <= $bulletinEndTime " +
"and timestampMillis > $provenanceStartTime and timestampMillis <= $provenanceEndTime");
reportingTask = initTask(properties);
long testBulletinStartTime = 1609538145L;
long testProvenanceStartTime = 1641074145L;
final Map<String, String> stateMap = new HashMap<>();
stateMap.put(TrackedQueryTime.BULLETIN_START_TIME.name(), String.valueOf(testBulletinStartTime));
stateMap.put(TrackedQueryTime.PROVENANCE_START_TIME.name(), String.valueOf(testProvenanceStartTime));
mockStateManager.setState(stateMap, Scope.LOCAL);
final long bulletinStartTime = Long.parseLong(context.getStateManager().getState(Scope.LOCAL).get(TrackedQueryTime.BULLETIN_START_TIME.name()));
final long provenanceStartTime = Long.parseLong(context.getStateManager().getState(Scope.LOCAL).get(TrackedQueryTime.PROVENANCE_START_TIME.name()));
assertEquals(testBulletinStartTime, bulletinStartTime);
assertEquals(testProvenanceStartTime, provenanceStartTime);
final long currentTime = Instant.now().toEpochMilli();
this.currentTime.set(currentTime);
reportingTask.onTrigger(context);
final long updatedBulletinStartTime = Long.parseLong(context.getStateManager().getState(Scope.LOCAL).get(TrackedQueryTime.BULLETIN_START_TIME.name()));
final long updatedProvenanceStartTime = Long.parseLong(context.getStateManager().getState(Scope.LOCAL).get(TrackedQueryTime.PROVENANCE_START_TIME.name()));
assertEquals(currentTime, updatedBulletinStartTime);
assertEquals(currentTime, updatedProvenanceStartTime);
}
private MockMetricsEventReportingTask initTask(Map<PropertyDescriptor, String> customProperties) throws InitializationException {
final ComponentLog logger = Mockito.mock(ComponentLog.class);
final BulletinRepository bulletinRepository = Mockito.mock(BulletinRepository.class);
reportingTask = new MockMetricsEventReportingTask();
final ReportingInitializationContext initContext = Mockito.mock(ReportingInitializationContext.class);
Mockito.when(initContext.getIdentifier()).thenReturn(UUID.randomUUID().toString());
@ -183,9 +289,8 @@ public class TestMetricsEventReportingTask {
context = Mockito.mock(ReportingContext.class);
Mockito.when(context.isAnalyticsEnabled()).thenReturn(true);
Mockito.when(context.getStateManager()).thenReturn(new MockStateManager(reportingTask));
Mockito.when(context.getBulletinRepository()).thenReturn(bulletinRepository);
Mockito.when(context.createBulletin(anyString(),any(Severity.class), anyString())).thenReturn(null);
mockStateManager = new MockStateManager(reportingTask);
Mockito.when(context.getStateManager()).thenReturn(mockStateManager);
Mockito.doAnswer((Answer<PropertyValue>) invocation -> {
final PropertyDescriptor descriptor = invocation.getArgument(0, PropertyDescriptor.class);
@ -200,13 +305,8 @@ public class TestMetricsEventReportingTask {
actionHandler = new MockPropertyContextActionHandler();
Mockito.when(pValue.asControllerService(PropertyContextActionHandler.class)).thenReturn(actionHandler);
Action action1 = new Action();
action1.setType("LOG");
Action action2 = new Action();
action2.setType("ALERT");
final PropertyValue resValue = Mockito.mock(StandardPropertyValue.class);
rulesEngineService = new MockRulesEngineService(Lists.newArrayList(action1,action2));
MockRulesEngineService rulesEngineService = new MockRulesEngineService();
Mockito.when(resValue.asControllerService(RulesEngineService.class)).thenReturn(rulesEngineService);
ConfigurationContext configContext = Mockito.mock(ConfigurationContext.class);
@ -216,10 +316,82 @@ public class TestMetricsEventReportingTask {
Mockito.when(configContext.getProperty(JdbcProperties.VARIABLE_REGISTRY_ONLY_DEFAULT_SCALE)).thenReturn(new MockPropertyValue("0"));
reportingTask.setup(configContext);
setupMockProvenanceRepository(eventAccess);
setupMockBulletinRepository();
return reportingTask;
}
private static final class MockMetricsEventReportingTask extends MetricsEventReportingTask {
private final class MockMetricsEventReportingTask extends MetricsEventReportingTask {
@Override
public long getCurrentTime() {
return currentTime.get();
}
}
}
private void setupMockBulletinRepository() {
mockBulletinRepository = new MockQueryBulletinRepository();
mockBulletinRepository.addBulletin(BulletinFactory.createBulletin(ComponentType.PROCESSOR.name().toLowerCase(), "WARN", "test bulletin 1", "testFlowFileUuid"));
Mockito.when(context.getBulletinRepository()).thenReturn(mockBulletinRepository);
}
private void setupMockProvenanceRepository(final EventAccess eventAccess) {
mockProvenanceRepository = new MockProvenanceRepository();
long currentTimeMillis = System.currentTimeMillis();
Map<String, String> previousAttributes = new HashMap<>();
previousAttributes.put("mime.type", "application/json");
previousAttributes.put("test.value", "A");
Map<String, String> updatedAttributes = new HashMap<>(previousAttributes);
updatedAttributes.put("test.value", "B");
// Generate provenance events and put them in a repository
Processor processor = mock(Processor.class);
SharedSessionState sharedState = new SharedSessionState(processor, new AtomicLong(0));
MockProcessSession processSession = new MockProcessSession(sharedState, processor);
MockFlowFile mockFlowFile = processSession.createFlowFile("Test content".getBytes());
ProvenanceEventRecord prov1 = mockProvenanceRepository.eventBuilder()
.setEventType(ProvenanceEventType.CREATE)
.fromFlowFile(mockFlowFile)
.setComponentId("1")
.setComponentType("ReportingTask")
.setFlowFileUUID("I am FlowFile 1")
.setEventTime(currentTimeMillis)
.setEventDuration(100)
.setTransitUri("test://")
.setSourceSystemFlowFileIdentifier("I am FlowFile 1")
.setAlternateIdentifierUri("remote://test")
.setAttributes(previousAttributes, updatedAttributes)
.build();
mockProvenanceRepository.registerEvent(prov1);
Mockito.when(eventAccess.getProvenanceRepository()).thenReturn(mockProvenanceRepository);
}
private static class MockQueryBulletinRepository extends MockBulletinRepository {
Map<String, List<Bulletin>> bulletins = new HashMap<>();
@Override
public void addBulletin(Bulletin bulletin) {
bulletins.computeIfAbsent(bulletin.getCategory(), key -> new ArrayList<>())
.add(bulletin);
}
@Override
public List<Bulletin> findBulletins(BulletinQuery bulletinQuery) {
return new ArrayList<>(
Optional.ofNullable(bulletins.get(bulletinQuery.getSourceType().name().toLowerCase()))
.orElse(Collections.emptyList())
);
}
@Override
public List<Bulletin> findBulletinsForController() {
return Optional.ofNullable(bulletins.get("controller"))
.orElse(Collections.emptyList());
}
}
}

View File

@ -20,6 +20,7 @@ package org.apache.nifi.reporting.sql;
import org.apache.nifi.attribute.expression.language.StandardPropertyValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.controller.status.ConnectionStatus;
import org.apache.nifi.controller.status.ProcessGroupStatus;
@ -34,12 +35,14 @@ import org.apache.nifi.record.sink.RecordSinkService;
import org.apache.nifi.reporting.Bulletin;
import org.apache.nifi.reporting.BulletinFactory;
import org.apache.nifi.reporting.BulletinQuery;
import org.apache.nifi.reporting.BulletinRepository;
import org.apache.nifi.reporting.ComponentType;
import org.apache.nifi.reporting.EventAccess;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.reporting.ReportingContext;
import org.apache.nifi.reporting.ReportingInitializationContext;
import org.apache.nifi.reporting.sql.util.QueryMetricsUtil;
import org.apache.nifi.reporting.sql.util.TrackedQueryTime;
import org.apache.nifi.reporting.util.metrics.MetricNames;
import org.apache.nifi.state.MockStateManager;
import org.apache.nifi.util.MockBulletinRepository;
@ -54,12 +57,14 @@ import org.mockito.Mockito;
import org.mockito.stubbing.Answer;
import java.io.IOException;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
@ -67,21 +72,24 @@ import java.util.stream.Stream;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.mock;
public class TestQueryNiFiReportingTask {
class TestQueryNiFiReportingTask {
private ReportingContext context;
private MockQueryNiFiReportingTask reportingTask;
private MockRecordSinkService mockRecordSinkService;
private ProcessGroupStatus status;
private BulletinRepository mockBulletinRepository;
private MockProvenanceRepository mockProvenanceRepository;
private AtomicLong currentTime;
private MockStateManager mockStateManager;
@BeforeEach
public void setup() {
mockRecordSinkService = new MockRecordSinkService();
currentTime = new AtomicLong();
status = new ProcessGroupStatus();
status.setId("1234");
status.setFlowFilesReceived(5);
@ -145,8 +153,6 @@ public class TestQueryNiFiReportingTask {
Collection<ConnectionStatus> nestedConnectionStatuses2 = new ArrayList<>();
nestedConnectionStatuses2.add(nestedConnectionStatus2);
groupStatus3.setConnectionStatus(nestedConnectionStatuses2);
Collection<ProcessGroupStatus> nestedGroupStatuses2 = new ArrayList<>();
nestedGroupStatuses2.add(groupStatus3);
Collection<ProcessGroupStatus> groupStatuses = new ArrayList<>();
groupStatuses.add(groupStatus1);
@ -156,7 +162,7 @@ public class TestQueryNiFiReportingTask {
}
@Test
public void testConnectionStatusTable() throws IOException, InitializationException {
void testConnectionStatusTable() throws InitializationException {
final Map<PropertyDescriptor, String> properties = new HashMap<>();
properties.put(QueryMetricsUtil.RECORD_SINK, "mock-record-sink");
properties.put(QueryMetricsUtil.QUERY, "select id,queuedCount,isBackPressureEnabled from CONNECTION_STATUS order by queuedCount desc");
@ -193,7 +199,191 @@ public class TestQueryNiFiReportingTask {
}
@Test
public void testJvmMetricsTable() throws IOException, InitializationException {
void testBulletinIsInTimeWindow() throws InitializationException {
String query = "select * from BULLETINS where bulletinTimestamp > $bulletinStartTime and bulletinTimestamp <= $bulletinEndTime";
final Map<PropertyDescriptor, String> properties = new HashMap<>();
properties.put(QueryMetricsUtil.QUERY, query);
reportingTask = initTask(properties);
currentTime.set(Instant.now().toEpochMilli());
reportingTask.onTrigger(context);
List<Map<String, Object>> rows = mockRecordSinkService.getRows();
assertEquals(3, rows.size());
final Bulletin bulletin = BulletinFactory.createBulletin(ComponentType.INPUT_PORT.name().toLowerCase(), "ERROR", "test bulletin 3", "testFlowFileUuid");
mockBulletinRepository.addBulletin(bulletin);
currentTime.set(bulletin.getTimestamp().getTime());
reportingTask.onTrigger(context);
List<Map<String, Object>> sameRows = mockRecordSinkService.getRows();
assertEquals(1, sameRows.size());
}
@Test
void testBulletinIsOutOfTimeWindow() throws InitializationException {
String query = "select * from BULLETINS where bulletinTimestamp > $bulletinStartTime and bulletinTimestamp <= $bulletinEndTime";
final Map<PropertyDescriptor, String> properties = new HashMap<>();
properties.put(QueryMetricsUtil.QUERY, query);
reportingTask = initTask(properties);
currentTime.set(Instant.now().toEpochMilli());
reportingTask.onTrigger(context);
List<Map<String, Object>> rows = mockRecordSinkService.getRows();
assertEquals(3, rows.size());
final Bulletin bulletin = BulletinFactory.createBulletin("input port", "ERROR", "test bulletin 3", "testFlowFileUuid");
mockBulletinRepository.addBulletin(bulletin);
currentTime.set(bulletin.getTimestamp().getTime() - 1);
reportingTask.onTrigger(context);
List<Map<String, Object>> sameRows = mockRecordSinkService.getRows();
assertEquals(0, sameRows.size());
}
@Test
void testProvenanceEventIsInTimeWindow() throws InitializationException {
final Map<PropertyDescriptor, String> properties = new HashMap<>();
properties.put(QueryMetricsUtil.QUERY, "select * from PROVENANCE where timestampMillis > $provenanceStartTime and timestampMillis <= $provenanceEndTime");
reportingTask = initTask(properties);
currentTime.set(Instant.now().toEpochMilli());
reportingTask.onTrigger(context);
List<Map<String, Object>> rows = mockRecordSinkService.getRows();
assertEquals(1001, rows.size());
MockFlowFile mockFlowFile = new MockFlowFile(1002L);
ProvenanceEventRecord prov1002 = mockProvenanceRepository.eventBuilder()
.setEventType(ProvenanceEventType.CREATE)
.fromFlowFile(mockFlowFile)
.setComponentId("12345")
.setComponentType("ReportingTask")
.setFlowFileUUID("I am FlowFile 1")
.setEventTime(Instant.now().toEpochMilli())
.setEventDuration(100)
.setTransitUri("test://")
.setSourceSystemFlowFileIdentifier("I am FlowFile 1")
.setAlternateIdentifierUri("remote://test")
.build();
mockProvenanceRepository.registerEvent(prov1002);
currentTime.set(prov1002.getEventTime());
reportingTask.onTrigger(context);
List<Map<String, Object>> sameRows = mockRecordSinkService.getRows();
assertEquals(1, sameRows.size());
}
@Test
void testProvenanceEventIsOutOfTimeWindow() throws InitializationException {
final Map<PropertyDescriptor, String> properties = new HashMap<>();
properties.put(QueryMetricsUtil.RECORD_SINK, "mock-record-sink");
properties.put(QueryMetricsUtil.QUERY, "select * from PROVENANCE where timestampMillis > $provenanceStartTime and timestampMillis <= $provenanceEndTime");
reportingTask = initTask(properties);
currentTime.set(Instant.now().toEpochMilli());
reportingTask.onTrigger(context);
List<Map<String, Object>> rows = mockRecordSinkService.getRows();
assertEquals(1001, rows.size());
MockFlowFile mockFlowFile = new MockFlowFile(1002L);
ProvenanceEventRecord prov1002 = mockProvenanceRepository.eventBuilder()
.setEventType(ProvenanceEventType.CREATE)
.fromFlowFile(mockFlowFile)
.setComponentId("12345")
.setComponentType("ReportingTask")
.setFlowFileUUID("I am FlowFile 1")
.setEventTime(Instant.now().toEpochMilli())
.setEventDuration(100)
.setTransitUri("test://")
.setSourceSystemFlowFileIdentifier("I am FlowFile 1")
.setAlternateIdentifierUri("remote://test")
.build();
mockProvenanceRepository.registerEvent(prov1002);
currentTime.set(prov1002.getEventTime() - 1);
reportingTask.onTrigger(context);
List<Map<String, Object>> sameRows = mockRecordSinkService.getRows();
assertEquals(0, sameRows.size());
}
@Test
void testUniqueProvenanceAndBulletinQuery() throws InitializationException {
final Map<PropertyDescriptor, String> properties = new HashMap<>();
properties.put(QueryMetricsUtil.QUERY, "select * from BULLETINS, PROVENANCE where " +
"bulletinTimestamp > $bulletinStartTime and bulletinTimestamp <= $bulletinEndTime " +
"and timestampMillis > $provenanceStartTime and timestampMillis <= $provenanceEndTime");
reportingTask = initTask(properties);
currentTime.set(Instant.now().toEpochMilli());
reportingTask.onTrigger(context);
List<Map<String, Object>> rows = mockRecordSinkService.getRows();
assertEquals(3003, rows.size());
final Bulletin bulletin = BulletinFactory.createBulletin(ComponentType.INPUT_PORT.name().toLowerCase(), "ERROR", "test bulletin 3", "testFlowFileUuid");
mockBulletinRepository.addBulletin(bulletin);
MockFlowFile mockFlowFile = new MockFlowFile(1002L);
ProvenanceEventRecord prov1002 = mockProvenanceRepository.eventBuilder()
.setEventType(ProvenanceEventType.CREATE)
.fromFlowFile(mockFlowFile)
.setComponentId("12345")
.setComponentType("ReportingTask")
.setFlowFileUUID("I am FlowFile 1")
.build();
mockProvenanceRepository.registerEvent(prov1002);
currentTime.set(bulletin.getTimestamp().getTime());
reportingTask.onTrigger(context);
List<Map<String, Object>> sameRows = mockRecordSinkService.getRows();
assertEquals(1, sameRows.size());
}
@Test
void testTimeWindowFromStateMap() throws IOException, InitializationException {
final Map<PropertyDescriptor, String> properties = new HashMap<>();
properties.put(QueryMetricsUtil.RECORD_SINK, "mock-record-sink");
properties.put(QueryMetricsUtil.QUERY, "select * from BULLETINS, PROVENANCE where " +
"bulletinTimestamp > $bulletinStartTime and bulletinTimestamp <= $bulletinEndTime " +
"and timestampMillis > $provenanceStartTime and timestampMillis <= $provenanceEndTime");
reportingTask = initTask(properties);
long testBulletinStartTime = 1609538145L;
long testProvenanceStartTime = 1641074145L;
final Map<String, String> stateMap = new HashMap<>();
stateMap.put(TrackedQueryTime.BULLETIN_START_TIME.name(), String.valueOf(testBulletinStartTime));
stateMap.put(TrackedQueryTime.PROVENANCE_START_TIME.name(), String.valueOf(testProvenanceStartTime));
mockStateManager.setState(stateMap, Scope.LOCAL);
final long bulletinStartTime = Long.parseLong(context.getStateManager().getState(Scope.LOCAL).get(TrackedQueryTime.BULLETIN_START_TIME.name()));
final long provenanceStartTime = Long.parseLong(context.getStateManager().getState(Scope.LOCAL).get(TrackedQueryTime.PROVENANCE_START_TIME.name()));
assertEquals(testBulletinStartTime, bulletinStartTime);
assertEquals(testProvenanceStartTime, provenanceStartTime);
final long currentTime = Instant.now().toEpochMilli();
this.currentTime.set(currentTime);
reportingTask.onTrigger(context);
final long updatedBulletinStartTime = Long.parseLong(context.getStateManager().getState(Scope.LOCAL).get(TrackedQueryTime.BULLETIN_START_TIME.name()));
final long updatedProvenanceStartTime = Long.parseLong(context.getStateManager().getState(Scope.LOCAL).get(TrackedQueryTime.PROVENANCE_START_TIME.name()));
assertEquals(currentTime, updatedBulletinStartTime);
assertEquals(currentTime, updatedProvenanceStartTime);
}
@Test
void testJvmMetricsTable() throws InitializationException {
final Map<PropertyDescriptor, String> properties = new HashMap<>();
properties.put(QueryMetricsUtil.RECORD_SINK, "mock-record-sink");
properties.put(QueryMetricsUtil.QUERY, "select "
@ -222,7 +412,7 @@ public class TestQueryNiFiReportingTask {
}
@Test
public void testProcessGroupStatusTable() throws IOException, InitializationException {
void testProcessGroupStatusTable() throws InitializationException {
final Map<PropertyDescriptor, String> properties = new HashMap<>();
properties.put(QueryMetricsUtil.RECORD_SINK, "mock-record-sink");
properties.put(QueryMetricsUtil.QUERY, "select * from PROCESS_GROUP_STATUS order by bytesRead asc");
@ -247,7 +437,7 @@ public class TestQueryNiFiReportingTask {
}
@Test
public void testNoResults() throws IOException, InitializationException {
void testNoResults() throws InitializationException {
final Map<PropertyDescriptor, String> properties = new HashMap<>();
properties.put(QueryMetricsUtil.RECORD_SINK, "mock-record-sink");
properties.put(QueryMetricsUtil.QUERY, "select * from CONNECTION_STATUS where queuedCount > 2000");
@ -259,7 +449,7 @@ public class TestQueryNiFiReportingTask {
}
@Test
public void testProvenanceTable() throws IOException, InitializationException {
void testProvenanceTable() throws InitializationException {
final Map<PropertyDescriptor, String> properties = new HashMap<>();
properties.put(QueryMetricsUtil.RECORD_SINK, "mock-record-sink");
properties.put(QueryMetricsUtil.QUERY, "select * from PROVENANCE order by eventId asc");
@ -304,7 +494,7 @@ public class TestQueryNiFiReportingTask {
}
@Test
public void testBulletinTable() throws IOException, InitializationException {
void testBulletinTable() throws InitializationException {
final Map<PropertyDescriptor, String> properties = new HashMap<>();
properties.put(QueryMetricsUtil.RECORD_SINK, "mock-record-sink");
properties.put(QueryMetricsUtil.QUERY, "select * from BULLETINS order by bulletinTimestamp asc");
@ -329,12 +519,12 @@ public class TestQueryNiFiReportingTask {
// Validate the third row
row = rows.get(2);
assertEquals("controller service", row.get("bulletinCategory"));
assertEquals("controller_service", row.get("bulletinCategory"));
assertEquals("ERROR", row.get("bulletinLevel"));
assertEquals(flowFileUuid, row.get("bulletinFlowFileUuid"));
}
private MockQueryNiFiReportingTask initTask(Map<PropertyDescriptor, String> customProperties) throws InitializationException, IOException {
private MockQueryNiFiReportingTask initTask(Map<PropertyDescriptor, String> customProperties) throws InitializationException {
final ComponentLog logger = mock(ComponentLog.class);
reportingTask = new MockQueryNiFiReportingTask();
@ -342,6 +532,7 @@ public class TestQueryNiFiReportingTask {
Mockito.when(initContext.getIdentifier()).thenReturn(UUID.randomUUID().toString());
Mockito.when(initContext.getLogger()).thenReturn(logger);
reportingTask.initialize(initContext);
Map<PropertyDescriptor, String> properties = new HashMap<>();
for (final PropertyDescriptor descriptor : reportingTask.getSupportedPropertyDescriptors()) {
properties.put(descriptor, descriptor.getDefaultValue());
@ -349,7 +540,10 @@ public class TestQueryNiFiReportingTask {
properties.putAll(customProperties);
context = mock(ReportingContext.class);
Mockito.when(context.getStateManager()).thenReturn(new MockStateManager(reportingTask));
mockStateManager = new MockStateManager(reportingTask);
Mockito.when(context.getStateManager()).thenReturn(mockStateManager);
Mockito.doAnswer((Answer<PropertyValue>) invocation -> {
final PropertyDescriptor descriptor = invocation.getArgument(0, PropertyDescriptor.class);
return new MockPropertyValue(properties.get(descriptor));
@ -371,7 +565,7 @@ public class TestQueryNiFiReportingTask {
Mockito.when(configContext.getProperty(JdbcProperties.VARIABLE_REGISTRY_ONLY_DEFAULT_SCALE)).thenReturn(new MockPropertyValue("0"));
reportingTask.setup(configContext);
MockProvenanceRepository provenanceRepository = new MockProvenanceRepository();
mockProvenanceRepository = new MockProvenanceRepository();
long currentTimeMillis = System.currentTimeMillis();
Map<String, String> previousAttributes = new HashMap<>();
previousAttributes.put("mime.type", "application/json");
@ -385,7 +579,7 @@ public class TestQueryNiFiReportingTask {
MockProcessSession processSession = new MockProcessSession(sharedState, processor);
MockFlowFile mockFlowFile = processSession.createFlowFile("Test content".getBytes());
ProvenanceEventRecord prov1 = provenanceRepository.eventBuilder()
ProvenanceEventRecord prov1 = mockProvenanceRepository.eventBuilder()
.setEventType(ProvenanceEventType.CREATE)
.fromFlowFile(mockFlowFile)
.setComponentId("12345")
@ -399,12 +593,12 @@ public class TestQueryNiFiReportingTask {
.setAttributes(previousAttributes, updatedAttributes)
.build();
provenanceRepository.registerEvent(prov1);
mockProvenanceRepository.registerEvent(prov1);
for (int i = 1; i < 1001; i++) {
String indexString = Integer.toString(i);
mockFlowFile = processSession.createFlowFile(("Test content " + indexString).getBytes());
ProvenanceEventRecord prov = provenanceRepository.eventBuilder()
ProvenanceEventRecord prov = mockProvenanceRepository.eventBuilder()
.fromFlowFile(mockFlowFile)
.setEventType(ProvenanceEventType.DROP)
.setComponentId(indexString)
@ -412,52 +606,48 @@ public class TestQueryNiFiReportingTask {
.setFlowFileUUID("I am FlowFile " + indexString)
.setEventTime(currentTimeMillis - i)
.build();
provenanceRepository.registerEvent(prov);
mockProvenanceRepository.registerEvent(prov);
}
Mockito.when(eventAccess.getProvenanceRepository()).thenReturn(provenanceRepository);
Mockito.when(eventAccess.getProvenanceRepository()).thenReturn(mockProvenanceRepository);
MockBulletinRepository bulletinRepository = new MockQueryBulletinRepository();
bulletinRepository.addBulletin(BulletinFactory.createBulletin("controller", "WARN", "test bulletin 2", "testFlowFileUuid"));
bulletinRepository.addBulletin(BulletinFactory.createBulletin("processor", "INFO", "test bulletin 1", "testFlowFileUuid"));
bulletinRepository.addBulletin(BulletinFactory.createBulletin("controller service", "ERROR", "test bulletin 2", "testFlowFileUuid"));
Mockito.when(context.getBulletinRepository()).thenReturn(bulletinRepository);
mockBulletinRepository = new MockQueryBulletinRepository();
mockBulletinRepository.addBulletin(BulletinFactory.createBulletin("controller", "WARN", "test bulletin 2", "testFlowFileUuid"));
mockBulletinRepository.addBulletin(BulletinFactory.createBulletin(ComponentType.PROCESSOR.name().toLowerCase(), "INFO", "test bulletin 1", "testFlowFileUuid"));
mockBulletinRepository.addBulletin(BulletinFactory.createBulletin(ComponentType.CONTROLLER_SERVICE.name().toLowerCase(), "ERROR", "test bulletin 2", "testFlowFileUuid"));
Mockito.when(context.getBulletinRepository()).thenReturn(mockBulletinRepository);
return reportingTask;
}
private static final class MockQueryNiFiReportingTask extends QueryNiFiReportingTask {
private final class MockQueryNiFiReportingTask extends QueryNiFiReportingTask {
@Override
public long getCurrentTime() {
return currentTime.get();
}
}
private static class MockQueryBulletinRepository extends MockBulletinRepository {
List<Bulletin> bulletinList;
public MockQueryBulletinRepository() {
bulletinList = new ArrayList<>();
}
Map<String, List<Bulletin>> bulletins = new HashMap<>();
@Override
public void addBulletin(Bulletin bulletin) {
bulletinList.add(bulletin);
bulletins.computeIfAbsent(bulletin.getCategory(), __ -> new ArrayList<>())
.add(bulletin);
}
@Override
public List<Bulletin> findBulletins(BulletinQuery bulletinQuery) {
if (bulletinQuery.getSourceType().equals(ComponentType.PROCESSOR)) {
return Collections.singletonList(bulletinList.get(1));
} else if (bulletinQuery.getSourceType().equals(ComponentType.CONTROLLER_SERVICE)) {
return Collections.singletonList(bulletinList.get(2));
} else {
return Collections.emptyList();
}
return new ArrayList<>(
Optional.ofNullable(bulletins.get(bulletinQuery.getSourceType().name().toLowerCase()))
.orElse(Collections.emptyList()));
}
@Override
public List<Bulletin> findBulletinsForController() {
return Collections.singletonList(bulletinList.get(0));
return Optional.ofNullable(bulletins.get("controller"))
.orElse(Collections.emptyList());
}
}
}

View File

@ -20,18 +20,14 @@ import org.apache.nifi.components.AbstractConfigurableComponent;
import org.apache.nifi.context.PropertyContext;
import org.apache.nifi.controller.ControllerServiceInitializationContext;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.util.Tuple;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
public class MockPropertyContextActionHandler extends AbstractConfigurableComponent implements PropertyContextActionHandler{
private List<Map<String, Object>> rows = new ArrayList<>();
private List<Tuple<String,Action>> defaultActions = new ArrayList<>();
private List<PropertyContext> propertyContexts = new ArrayList<>();
public class MockPropertyContextActionHandler extends AbstractConfigurableComponent implements PropertyContextActionHandler {
private final List<Map<String, Object>> rows = new ArrayList<>();
private final List<PropertyContext> propertyContexts = new ArrayList<>();
@Override
@ -43,7 +39,6 @@ public class MockPropertyContextActionHandler extends AbstractConfigurableCompon
@Override
public void execute(Action action, Map<String, Object> facts) {
rows.add(facts);
defaultActions.add( new Tuple<>(action.getType(),action));
}
@ -56,15 +51,6 @@ public class MockPropertyContextActionHandler extends AbstractConfigurableCompon
return rows;
}
public List<Tuple<String, Action>> getDefaultActions() {
return defaultActions;
}
public List<Tuple<String,Action>> getDefaultActionsByType(final String type){
return defaultActions.stream().filter(stringActionTuple -> stringActionTuple
.getKey().equalsIgnoreCase(type)).collect(Collectors.toList());
}
public List<PropertyContext> getPropertyContexts() {
return propertyContexts;
}
@ -73,4 +59,9 @@ public class MockPropertyContextActionHandler extends AbstractConfigurableCompon
public String getIdentifier() {
return "MockPropertyContextActionHandler";
}
}
public void reset() {
rows.clear();
propertyContexts.clear();
}
}

View File

@ -18,26 +18,21 @@ package org.apache.nifi.rules.engine;
import org.apache.nifi.components.AbstractConfigurableComponent;
import org.apache.nifi.controller.ControllerServiceInitializationContext;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.rules.Action;
import org.mockito.Mockito;
import java.util.Collections;
import java.util.List;
import java.util.Map;
public class MockRulesEngineService extends AbstractConfigurableComponent implements RulesEngineService {
private List<Action> actions;
public MockRulesEngineService(List<Action> actions) {
this.actions = actions;
}
@Override
public List<Action> fireRules(Map<String, Object> facts) {
return actions;
return Collections.singletonList(Mockito.mock(Action.class));
}
@Override
public void initialize(ControllerServiceInitializationContext context) throws InitializationException {
public void initialize(ControllerServiceInitializationContext context) {
}
@Override