NIFI-10699: Add FLOW_CONFIG_HISTORY table to QueryNiFiReportingTask

This closes #6581

Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
Matthew Burgess 2022-10-25 15:19:59 -04:00 committed by exceptionfactory
parent 6a9cb9c10c
commit 09bc5bcb5a
No known key found for this signature in database
GPG Key ID: 29B6A52D2AAE8DBA
9 changed files with 591 additions and 5 deletions

View File

@ -91,6 +91,12 @@
<version>1.19.0-SNAPSHOT</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-user-actions</artifactId>
<version>1.19.0-SNAPSHOT</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-rules-engine-service-api</artifactId>

View File

@ -30,6 +30,7 @@ import org.apache.nifi.reporting.ReportingContext;
import org.apache.nifi.reporting.sql.bulletins.BulletinTable;
import org.apache.nifi.reporting.sql.connectionstatus.ConnectionStatusTable;
import org.apache.nifi.reporting.sql.connectionstatuspredictions.ConnectionStatusPredictionsTable;
import org.apache.nifi.reporting.sql.flowconfighistory.FlowConfigHistoryTable;
import org.apache.nifi.reporting.sql.metrics.JvmMetricsTable;
import org.apache.nifi.reporting.sql.processgroupstatus.ProcessGroupStatusTable;
import org.apache.nifi.reporting.sql.processorstatus.ProcessorStatusTable;
@ -169,6 +170,8 @@ public class MetricsSqlQueryService implements MetricsQueryService {
rootSchema.add("BULLETINS", bulletinTable);
final ProvenanceTable provenanceTable = new ProvenanceTable(context, getLogger());
rootSchema.add("PROVENANCE", provenanceTable);
final FlowConfigHistoryTable flowConfigHistoryTable = new FlowConfigHistoryTable(context, getLogger());
rootSchema.add("FLOW_CONFIG_HISTORY", flowConfigHistoryTable);
rootSchema.setCacheEnabled(false);

View File

@ -41,16 +41,18 @@ import java.util.concurrent.TimeUnit;
import static org.apache.nifi.reporting.sql.util.TrackedQueryTime.BULLETIN_END_TIME;
import static org.apache.nifi.reporting.sql.util.TrackedQueryTime.BULLETIN_START_TIME;
import static org.apache.nifi.reporting.sql.util.TrackedQueryTime.FLOW_CONFIG_HISTORY_END_TIME;
import static org.apache.nifi.reporting.sql.util.TrackedQueryTime.FLOW_CONFIG_HISTORY_START_TIME;
import static org.apache.nifi.reporting.sql.util.TrackedQueryTime.PROVENANCE_END_TIME;
import static org.apache.nifi.reporting.sql.util.TrackedQueryTime.PROVENANCE_START_TIME;
import static org.apache.nifi.util.db.JdbcProperties.VARIABLE_REGISTRY_ONLY_DEFAULT_PRECISION;
import static org.apache.nifi.util.db.JdbcProperties.VARIABLE_REGISTRY_ONLY_DEFAULT_SCALE;
@Tags({"status", "connection", "processor", "jvm", "metrics", "history", "bulletin", "prediction", "process", "group", "provenance", "record", "sql"})
@Tags({"status", "connection", "processor", "jvm", "metrics", "history", "bulletin", "prediction", "process", "group", "provenance", "record", "sql", "flow", "config"})
@CapabilityDescription("Publishes NiFi status information based on the results of a user-specified SQL query. The query may make use of the CONNECTION_STATUS, PROCESSOR_STATUS, "
+ "BULLETINS, PROCESS_GROUP_STATUS, JVM_METRICS, CONNECTION_STATUS_PREDICTIONS, or PROVENANCE tables, and can use any functions or capabilities provided by Apache Calcite. Note that the "
+ "CONNECTION_STATUS_PREDICTIONS table is not available for querying if analytics are not enabled (see the nifi.analytics.predict.enabled property in nifi.properties). Attempting a "
+ "query on the table when the capability is disabled will cause an error.")
+ "BULLETINS, PROCESS_GROUP_STATUS, JVM_METRICS, CONNECTION_STATUS_PREDICTIONS, FLOW_CONFIG_HISTORY, or PROVENANCE tables, and can use any functions or capabilities provided by "
+ "Apache Calcite. Note that the CONNECTION_STATUS_PREDICTIONS table is not available for querying if analytics are not enabled (see the nifi.analytics.predict.enabled property "
+ "in nifi.properties). Attempting a query on the table when the capability is disabled will cause an error.")
@Stateful(scopes = Scope.LOCAL, description = "Stores the Reporting Task's last execution time so that on restart the task knows where it left off.")
public class QueryNiFiReportingTask extends AbstractReportingTask implements QueryTimeAware {
@ -92,6 +94,7 @@ public class QueryNiFiReportingTask extends AbstractReportingTask implements Que
try {
sql = processStartAndEndTimes(context, sql, BULLETIN_START_TIME, BULLETIN_END_TIME);
sql = processStartAndEndTimes(context, sql, PROVENANCE_START_TIME, PROVENANCE_END_TIME);
sql = processStartAndEndTimes(context, sql, FLOW_CONFIG_HISTORY_START_TIME, FLOW_CONFIG_HISTORY_END_TIME);
getLogger().debug("Executing query: {}", sql);
final QueryResult queryResult = metricsQueryService.query(context, sql);

View File

@ -0,0 +1,151 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.reporting.sql.flowconfighistory;
import org.apache.calcite.linq4j.Enumerator;
import org.apache.nifi.action.Action;
import org.apache.nifi.action.details.ConfigureDetails;
import org.apache.nifi.action.details.ConnectDetails;
import org.apache.nifi.action.details.MoveDetails;
import org.apache.nifi.action.details.PurgeDetails;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.reporting.ReportingContext;
import java.util.Iterator;
import java.util.List;
public class FlowConfigHistoryEnumerator implements Enumerator<Object> {
private final ReportingContext context;
private final ComponentLog logger;
private final int[] fields;
private Iterator<Action> actionIterator;
private Object currentRow;
private int recordsRead = 0;
public FlowConfigHistoryEnumerator(final ReportingContext context, final ComponentLog logger, final int[] fields) {
this.context = context;
this.logger = logger;
this.fields = fields;
reset();
}
@Override
public Object current() {
return currentRow;
}
@Override
public boolean moveNext() {
currentRow = null;
if (!actionIterator.hasNext()) {
// If we are out of data, close the InputStream. We do this because
// Calcite does not necessarily call our close() method.
close();
try {
onFinish();
} catch (final Exception e) {
logger.error("Failed to perform tasks when enumerator was finished", e);
}
return false;
}
final Action action = actionIterator.next();
currentRow = filterColumns(action);
recordsRead++;
return true;
}
protected int getRecordsRead() {
return recordsRead;
}
protected void onFinish() {
}
private Object filterColumns(final Action action) {
if (action == null) {
return null;
}
final boolean isClustered = context.isClustered();
String nodeId = context.getClusterNodeIdentifier();
if (nodeId == null && isClustered) {
nodeId = "unknown";
}
final Object[] row = new Object[]{
action.getId(),
action.getTimestamp().getTime(),
action.getUserIdentity(),
action.getSourceId(),
action.getSourceName(),
action.getSourceType(),
action.getOperation().toString(),
action.getActionDetails() instanceof ConfigureDetails ? ((ConfigureDetails) action.getActionDetails()).getName() : null,
action.getActionDetails() instanceof ConfigureDetails ? ((ConfigureDetails) action.getActionDetails()).getPreviousValue() : null,
action.getActionDetails() instanceof ConfigureDetails ? ((ConfigureDetails) action.getActionDetails()).getValue() : null,
action.getActionDetails() instanceof ConnectDetails ? ((ConnectDetails) action.getActionDetails()).getSourceId() : null,
action.getActionDetails() instanceof ConnectDetails ? ((ConnectDetails) action.getActionDetails()).getSourceName() : null,
action.getActionDetails() instanceof ConnectDetails ? ((ConnectDetails) action.getActionDetails()).getSourceType() : null,
action.getActionDetails() instanceof ConnectDetails ? ((ConnectDetails) action.getActionDetails()).getDestinationId() : null,
action.getActionDetails() instanceof ConnectDetails ? ((ConnectDetails) action.getActionDetails()).getDestinationName() : null,
action.getActionDetails() instanceof ConnectDetails ? ((ConnectDetails) action.getActionDetails()).getDestinationType() : null,
action.getActionDetails() instanceof ConnectDetails ? ((ConnectDetails) action.getActionDetails()).getRelationship() : null,
action.getActionDetails() instanceof MoveDetails ? ((MoveDetails) action.getActionDetails()).getGroup() : null,
action.getActionDetails() instanceof MoveDetails ? ((MoveDetails) action.getActionDetails()).getGroupId() : null,
action.getActionDetails() instanceof MoveDetails ? ((MoveDetails) action.getActionDetails()).getPreviousGroup() : null,
action.getActionDetails() instanceof MoveDetails ? ((MoveDetails) action.getActionDetails()).getPreviousGroupId() : null,
action.getActionDetails() instanceof PurgeDetails ? ((PurgeDetails) action.getActionDetails()).getEndDate().getTime() : null
};
// If we want no fields just return null
if (fields == null) {
return row;
}
// If we want only a single field, then Calcite is going to expect us to return
// the actual value, NOT a 1-element array of values.
if (fields.length == 1) {
final int desiredCellIndex = fields[0];
return row[desiredCellIndex];
}
// Create a new Object array that contains only the desired fields.
final Object[] filtered = new Object[fields.length];
for (int i = 0; i < fields.length; i++) {
final int indexToKeep = fields[i];
filtered[i] = row[indexToKeep];
}
return filtered;
}
@Override
public void reset() {
List<Action> fullFlowConfigHistoryList = context.getEventAccess().getFlowChanges(0, Short.MAX_VALUE);
actionIterator = fullFlowConfigHistoryList.iterator();
}
@Override
public void close() {
}
}

View File

@ -0,0 +1,76 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to you under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.reporting.sql.flowconfighistory;
import org.apache.calcite.plan.RelOptRule;
import org.apache.calcite.plan.RelOptRuleCall;
import org.apache.calcite.rel.logical.LogicalProject;
import org.apache.calcite.rex.RexInputRef;
import org.apache.calcite.rex.RexNode;
import java.util.List;
/**
* Planner rule that projects from a {@link FlowConfigHistoryTableScan} scan just the columns
* needed to satisfy a projection. If the projection's expressions are trivial,
* the projection is removed.
*/
public class FlowConfigHistoryProjectTableScanRule extends RelOptRule {
public static final FlowConfigHistoryProjectTableScanRule INSTANCE = new FlowConfigHistoryProjectTableScanRule();
private FlowConfigHistoryProjectTableScanRule() {
super(
operand(LogicalProject.class,
operand(FlowConfigHistoryTableScan.class, none())),
"BulletinProjectTableScanRule");
}
@Override
public void onMatch(RelOptRuleCall call) {
final LogicalProject project = call.rel(0);
final FlowConfigHistoryTableScan scan = call.rel(1);
final int[] fields = getProjectFields(project.getProjects());
if (fields == null) {
// Project contains expressions more complex than just field references.
return;
}
call.transformTo(
new FlowConfigHistoryTableScan(
scan.getCluster(),
scan.getTable(),
scan.flowConfigHistoryTable,
fields));
}
private int[] getProjectFields(List<RexNode> exps) {
final int[] fields = new int[exps.size()];
for (int i = 0; i < exps.size(); i++) {
final RexNode exp = exps.get(i);
if (exp instanceof RexInputRef) {
fields[i] = ((RexInputRef) exp).getIndex();
} else {
return null; // not a simple projection
}
}
return fields;
}
}

View File

@ -0,0 +1,211 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to you under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.reporting.sql.flowconfighistory;
import org.apache.calcite.linq4j.AbstractEnumerable;
import org.apache.calcite.linq4j.Enumerable;
import org.apache.calcite.linq4j.Enumerator;
import org.apache.calcite.linq4j.QueryProvider;
import org.apache.calcite.linq4j.Queryable;
import org.apache.calcite.linq4j.tree.Expression;
import org.apache.calcite.plan.RelOptTable;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.schema.QueryableTable;
import org.apache.calcite.schema.Schema.TableType;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.calcite.schema.Schemas;
import org.apache.calcite.schema.TranslatableTable;
import org.apache.calcite.schema.impl.AbstractTable;
import org.apache.calcite.util.Pair;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.reporting.ReportingContext;
import java.lang.reflect.Type;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
public class FlowConfigHistoryTable extends AbstractTable implements QueryableTable, TranslatableTable {
private final ComponentLog logger;
private RelDataType relDataType = null;
private final ReportingContext context;
private volatile int maxRecordsRead;
private final Set<FlowConfigHistoryEnumerator> enumerators = new HashSet<>();
/**
* Creates a Flow Configuration History table.
*/
public FlowConfigHistoryTable(final ReportingContext context, final ComponentLog logger) {
this.context = context;
this.logger = logger;
}
@Override
public String toString() {
return "FlowConfigHistoryTable";
}
public void close() {
synchronized (enumerators) {
for (final FlowConfigHistoryEnumerator enumerator : enumerators) {
enumerator.close();
}
}
}
/**
* Returns an enumerable over a given projection of the fields.
*
* <p>
* Called from generated code.
*/
public Enumerable<Object> project(final int[] fields) {
return new AbstractEnumerable<Object>() {
@Override
@SuppressWarnings({"unchecked", "rawtypes"})
public Enumerator<Object> enumerator() {
final FlowConfigHistoryEnumerator flowConfigHistoryEnumerator = new FlowConfigHistoryEnumerator(context, logger, fields) {
@Override
protected void onFinish() {
final int recordCount = getRecordsRead();
if (recordCount > maxRecordsRead) {
maxRecordsRead = recordCount;
}
}
@Override
public void close() {
synchronized (enumerators) {
enumerators.remove(this);
}
super.close();
}
};
synchronized (enumerators) {
enumerators.add(flowConfigHistoryEnumerator);
}
return flowConfigHistoryEnumerator;
}
};
}
public int getRecordsRead() {
return maxRecordsRead;
}
@Override
@SuppressWarnings("rawtypes")
public Expression getExpression(final SchemaPlus schema, final String tableName, final Class clazz) {
return Schemas.tableExpression(schema, getElementType(), tableName, clazz);
}
@Override
public Type getElementType() {
return Object[].class;
}
@Override
public <T> Queryable<T> asQueryable(final QueryProvider queryProvider, final SchemaPlus schema, final String tableName) {
throw new UnsupportedOperationException();
}
@Override
public RelNode toRel(final RelOptTable.ToRelContext context, final RelOptTable relOptTable) {
// Request all fields.
final int fieldCount = relOptTable.getRowType().getFieldCount();
final int[] fields = new int[fieldCount];
for (int i = 0; i < fieldCount; i++) {
fields[i] = i;
}
return new FlowConfigHistoryTableScan(context.getCluster(), relOptTable, this, fields);
}
@Override
public RelDataType getRowType(final RelDataTypeFactory typeFactory) {
if (relDataType != null) {
return relDataType;
}
final List<String> names = Arrays.asList(
"actionId",
"actionTimestamp",
"actionUserIdentity",
"actionSourceId",
"actionSourceName",
"actionSourceType",
"actionOperation",
"configureDetailsName",
"configureDetailsPreviousValue",
"configureDetailsValue",
"connectionSourceId",
"connectionSourceName",
"connectionSourceType",
"connectionDestinationId",
"connectionDestinationName",
"connectionDestinationType",
"connectionRelationship",
"moveGroup",
"moveGroupId",
"movePreviousGroup",
"movePreviousGroupId",
"purgeEndDate"
);
final List<RelDataType> types = Arrays.asList(
typeFactory.createJavaType(int.class),
typeFactory.createJavaType(long.class),
typeFactory.createJavaType(String.class),
typeFactory.createJavaType(String.class),
typeFactory.createJavaType(String.class),
typeFactory.createJavaType(String.class),
typeFactory.createJavaType(String.class),
typeFactory.createJavaType(String.class),
typeFactory.createJavaType(String.class),
typeFactory.createJavaType(String.class),
typeFactory.createJavaType(String.class),
typeFactory.createJavaType(String.class),
typeFactory.createJavaType(String.class),
typeFactory.createJavaType(String.class),
typeFactory.createJavaType(String.class),
typeFactory.createJavaType(String.class),
typeFactory.createJavaType(String.class),
typeFactory.createJavaType(String.class),
typeFactory.createJavaType(String.class),
typeFactory.createJavaType(String.class),
typeFactory.createJavaType(String.class),
typeFactory.createJavaType(long.class)
);
relDataType = typeFactory.createStructType(Pair.zip(names, types));
return relDataType;
}
@Override
public TableType getJdbcTableType() {
return TableType.TEMPORARY_TABLE;
}
}

View File

@ -0,0 +1,91 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to you under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.reporting.sql.flowconfighistory;
import org.apache.calcite.adapter.enumerable.EnumerableConvention;
import org.apache.calcite.adapter.enumerable.EnumerableRel;
import org.apache.calcite.adapter.enumerable.EnumerableRelImplementor;
import org.apache.calcite.adapter.enumerable.PhysType;
import org.apache.calcite.adapter.enumerable.PhysTypeImpl;
import org.apache.calcite.linq4j.tree.Blocks;
import org.apache.calcite.linq4j.tree.Expressions;
import org.apache.calcite.linq4j.tree.Primitive;
import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelOptPlanner;
import org.apache.calcite.plan.RelOptTable;
import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.RelWriter;
import org.apache.calcite.rel.core.TableScan;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.rel.type.RelDataTypeField;
import java.util.List;
/**
* Relational expression representing a query for bulletin information.
*
* <p>
* Like any table scan, it serves as a leaf node of a query tree.
* </p>
*/
public class FlowConfigHistoryTableScan extends TableScan implements EnumerableRel {
final FlowConfigHistoryTable flowConfigHistoryTable;
final int[] fields;
protected FlowConfigHistoryTableScan(final RelOptCluster cluster, final RelOptTable table, final FlowConfigHistoryTable flowConfigHistoryTable, final int[] fields) {
super(cluster, cluster.traitSetOf(EnumerableConvention.INSTANCE), table);
this.flowConfigHistoryTable = flowConfigHistoryTable;
this.fields = fields;
}
@Override
public RelNode copy(final RelTraitSet traitSet, final List<RelNode> inputs) {
return new FlowConfigHistoryTableScan(getCluster(), table, flowConfigHistoryTable, fields);
}
@Override
public RelWriter explainTerms(final RelWriter pw) {
return super.explainTerms(pw).item("fields", Primitive.asList(fields));
}
@Override
public RelDataType deriveRowType() {
final List<RelDataTypeField> fieldList = table.getRowType().getFieldList();
final RelDataTypeFactory.Builder builder = getCluster().getTypeFactory().builder();
for (int field : fields) {
builder.add(fieldList.get(field));
}
return builder.build();
}
@Override
public void register(RelOptPlanner planner) {
planner.addRule(FlowConfigHistoryProjectTableScanRule.INSTANCE);
}
@Override
public Result implement(EnumerableRelImplementor implementor, Prefer pref) {
PhysType physType = PhysTypeImpl.of(implementor.getTypeFactory(), getRowType(), pref.preferArray());
return implementor.result(physType, Blocks.toBlock(
Expressions.call(table.getExpression(FlowConfigHistoryTable.class), "project", Expressions.constant(fields))));
}
}

View File

@ -21,7 +21,9 @@ public enum TrackedQueryTime {
BULLETIN_START_TIME("$bulletinStartTime"),
BULLETIN_END_TIME("$bulletinEndTime"),
PROVENANCE_START_TIME("$provenanceStartTime"),
PROVENANCE_END_TIME("$provenanceEndTime");
PROVENANCE_END_TIME("$provenanceEndTime"),
FLOW_CONFIG_HISTORY_START_TIME("$flowConfigHistoryStartTime"),
FLOW_CONFIG_HISTORY_END_TIME("$flowConfigHistoryEndTime");
private final String sqlPlaceholder;

View File

@ -17,6 +17,11 @@
package org.apache.nifi.reporting.sql;
import org.apache.nifi.action.Action;
import org.apache.nifi.action.Component;
import org.apache.nifi.action.FlowChangeAction;
import org.apache.nifi.action.Operation;
import org.apache.nifi.action.details.FlowChangeConfigureDetails;
import org.apache.nifi.attribute.expression.language.StandardPropertyValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue;
@ -61,6 +66,7 @@ import java.time.Instant;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -74,6 +80,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.Mockito.mock;
class TestQueryNiFiReportingTask {
@ -86,6 +93,7 @@ class TestQueryNiFiReportingTask {
private MockProvenanceRepository mockProvenanceRepository;
private AtomicLong currentTime;
private MockStateManager mockStateManager;
private List<Action> flowConfigHistory;
@BeforeEach
public void setup() {
@ -105,6 +113,7 @@ class TestQueryNiFiReportingTask {
// create a processor status with processing time
ProcessorStatus procStatus = new ProcessorStatus();
procStatus.setId("proc");
procStatus.setName("Processor 1");
procStatus.setProcessingNanos(123456789);
Collection<ProcessorStatus> processorStatuses = new ArrayList<>();
@ -159,6 +168,21 @@ class TestQueryNiFiReportingTask {
groupStatuses.add(groupStatus3);
status.setProcessGroupStatus(groupStatuses);
// Populate flow config history
FlowChangeAction action1 = new FlowChangeAction();
action1.setId(123);
action1.setTimestamp(new Date());
action1.setUserIdentity("test");
action1.setSourceId("proc");
action1.setSourceName("Processor 1");
action1.setSourceType(Component.Processor);
action1.setOperation(Operation.Configure);
FlowChangeConfigureDetails configureDetails1 = new FlowChangeConfigureDetails();
configureDetails1.setName("property1");
configureDetails1.setPreviousValue("1");
configureDetails1.setValue("2");
action1.setActionDetails(configureDetails1);
flowConfigHistory = Collections.singletonList(action1);
}
@Test
@ -524,6 +548,24 @@ class TestQueryNiFiReportingTask {
assertEquals(flowFileUuid, row.get("bulletinFlowFileUuid"));
}
@Test
void testFlowConfigHistoryTable() throws InitializationException {
final Map<PropertyDescriptor, String> properties = new HashMap<>();
properties.put(QueryMetricsUtil.RECORD_SINK, "mock-record-sink");
properties.put(QueryMetricsUtil.QUERY, "select * from FLOW_CONFIG_HISTORY");
reportingTask = initTask(properties);
reportingTask.onTrigger(context);
List<Map<String, Object>> rows = mockRecordSinkService.getRows();
assertEquals(1, rows.size());
// Validate the first row
Map<String, Object> row = rows.get(0);
assertEquals(22, row.size());
// Verify the first row contents
assertEquals(123, row.get("actionId"));
assertEquals("Configure", row.get("actionOperation"));
}
private MockQueryNiFiReportingTask initTask(Map<PropertyDescriptor, String> customProperties) throws InitializationException {
final ComponentLog logger = mock(ComponentLog.class);
@ -552,6 +594,7 @@ class TestQueryNiFiReportingTask {
final EventAccess eventAccess = mock(EventAccess.class);
Mockito.when(context.getEventAccess()).thenReturn(eventAccess);
Mockito.when(eventAccess.getControllerStatus()).thenReturn(status);
Mockito.when(eventAccess.getFlowChanges(anyInt(), anyInt())).thenReturn(flowConfigHistory);
final PropertyValue pValue = mock(StandardPropertyValue.class);
mockRecordSinkService = new MockRecordSinkService();