mirror of https://github.com/apache/nifi.git
NIFI-11067 Delete Property History when changing Sensitive status
This closes #7082 Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
parent
c819c4de94
commit
ffaf81ec46
|
@ -55,6 +55,15 @@ public interface ActionDAO {
|
|||
*/
|
||||
Map<String, List<PreviousValue>> getPreviousValues(String componentId);
|
||||
|
||||
/**
|
||||
* Deletes the history of a component's property.
|
||||
*
|
||||
* @param propertyName the name of the property
|
||||
* @param componentId to delete previous values of
|
||||
*/
|
||||
void deletePreviousValues(String propertyName, String componentId);
|
||||
|
||||
|
||||
/**
|
||||
* Finds the specified action.
|
||||
*
|
||||
|
|
|
@ -186,6 +186,18 @@ public class StandardActionDAO implements ActionDAO {
|
|||
+ "ORDER BY A.ACTION_TIMESTAMP DESC "
|
||||
+ "LIMIT 4";
|
||||
|
||||
private static final String DELETE_PREVIOUS_VALUES = "DELETE FROM CONFIGURE_DETAILS " +
|
||||
"WHERE NAME = ? " +
|
||||
"AND ACTION_ID IN " +
|
||||
"(SELECT ID FROM ACTION WHERE SOURCE_ID = ?)";
|
||||
private static final String ACTION_TIMESTAMP = "ACTION_TIMESTAMP";
|
||||
private static final String SOURCE_NAME = "SOURCE_NAME";
|
||||
private static final String SOURCE_TYPE = "SOURCE_TYPE";
|
||||
private static final String OPERATION = "OPERATION";
|
||||
private static final String IDENTITY = "IDENTITY";
|
||||
private static final String ACTION_ID = "ACTION_ID";
|
||||
private static final String SOURCE_ID = "SOURCE_ID";
|
||||
|
||||
private final Connection connection;
|
||||
private final Map<String, String> columnMap;
|
||||
|
||||
|
@ -194,11 +206,11 @@ public class StandardActionDAO implements ActionDAO {
|
|||
|
||||
// initialize the column mappings
|
||||
this.columnMap = new HashMap<>();
|
||||
this.columnMap.put("timestamp", "ACTION_TIMESTAMP");
|
||||
this.columnMap.put("sourceName", "SOURCE_NAME");
|
||||
this.columnMap.put("sourceType", "SOURCE_TYPE");
|
||||
this.columnMap.put("operation", "OPERATION");
|
||||
this.columnMap.put("userIdentity", "IDENTITY");
|
||||
this.columnMap.put("timestamp", ACTION_TIMESTAMP);
|
||||
this.columnMap.put("sourceName", SOURCE_NAME);
|
||||
this.columnMap.put("sourceType", SOURCE_TYPE);
|
||||
this.columnMap.put("operation", OPERATION);
|
||||
this.columnMap.put("userIdentity", IDENTITY);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -421,9 +433,8 @@ public class StandardActionDAO implements ActionDAO {
|
|||
|
||||
@Override
|
||||
public History findActions(HistoryQuery historyQuery) throws DataAccessException {
|
||||
|
||||
// get the sort column
|
||||
String sortColumn = "ACTION_TIMESTAMP";
|
||||
String sortColumn = ACTION_TIMESTAMP;
|
||||
if (StringUtils.isNotBlank(historyQuery.getSortColumn())) {
|
||||
String rawColumnName = historyQuery.getSortColumn();
|
||||
if (!columnMap.containsKey(rawColumnName)) {
|
||||
|
@ -433,10 +444,7 @@ public class StandardActionDAO implements ActionDAO {
|
|||
}
|
||||
|
||||
// get the sort order
|
||||
String sortOrder = "desc";
|
||||
if (StringUtils.isNotBlank(historyQuery.getSortOrder())) {
|
||||
sortOrder = historyQuery.getSortOrder();
|
||||
}
|
||||
String sortOrder = StringUtils.defaultIfBlank(historyQuery.getSortOrder(), "desc");
|
||||
|
||||
History actionResult = new History();
|
||||
Collection<Action> actions = new ArrayList<>();
|
||||
|
@ -554,17 +562,17 @@ public class StandardActionDAO implements ActionDAO {
|
|||
// create each corresponding action
|
||||
while (rs.next()) {
|
||||
final Integer actionId = rs.getInt("ID");
|
||||
final Operation operation = Operation.valueOf(rs.getString("OPERATION"));
|
||||
final Component component = Component.valueOf(rs.getString("SOURCE_TYPE"));
|
||||
final Operation operation = Operation.valueOf(rs.getString(OPERATION));
|
||||
final Component component = Component.valueOf(rs.getString(SOURCE_TYPE));
|
||||
|
||||
FlowChangeAction action = new FlowChangeAction();
|
||||
action.setId(actionId);
|
||||
action.setUserIdentity(rs.getString("IDENTITY"));
|
||||
action.setOperation(Operation.valueOf(rs.getString("OPERATION")));
|
||||
action.setTimestamp(new Date(rs.getTimestamp("ACTION_TIMESTAMP").getTime()));
|
||||
action.setSourceId(rs.getString("SOURCE_ID"));
|
||||
action.setSourceName(rs.getString("SOURCE_NAME"));
|
||||
action.setSourceType(Component.valueOf(rs.getString("SOURCE_TYPE")));
|
||||
action.setUserIdentity(rs.getString(IDENTITY));
|
||||
action.setOperation(Operation.valueOf(rs.getString(OPERATION)));
|
||||
action.setTimestamp(new Date(rs.getTimestamp(ACTION_TIMESTAMP).getTime()));
|
||||
action.setSourceId(rs.getString(SOURCE_ID));
|
||||
action.setSourceName(rs.getString(SOURCE_NAME));
|
||||
action.setSourceType(Component.valueOf(rs.getString(SOURCE_TYPE)));
|
||||
|
||||
// get the component details if appropriate
|
||||
ComponentDetails componentDetails = null;
|
||||
|
@ -627,17 +635,17 @@ public class StandardActionDAO implements ActionDAO {
|
|||
|
||||
// ensure results
|
||||
if (rs.next()) {
|
||||
Operation operation = Operation.valueOf(rs.getString("OPERATION"));
|
||||
Component component = Component.valueOf(rs.getString("SOURCE_TYPE"));
|
||||
Operation operation = Operation.valueOf(rs.getString(OPERATION));
|
||||
Component component = Component.valueOf(rs.getString(SOURCE_TYPE));
|
||||
|
||||
// populate the action
|
||||
action = new FlowChangeAction();
|
||||
action.setId(rs.getInt("ID"));
|
||||
action.setUserIdentity(rs.getString("IDENTITY"));
|
||||
action.setUserIdentity(rs.getString(IDENTITY));
|
||||
action.setOperation(operation);
|
||||
action.setTimestamp(new Date(rs.getTimestamp("ACTION_TIMESTAMP").getTime()));
|
||||
action.setSourceId(rs.getString("SOURCE_ID"));
|
||||
action.setSourceName(rs.getString("SOURCE_NAME"));
|
||||
action.setTimestamp(new Date(rs.getTimestamp(ACTION_TIMESTAMP).getTime()));
|
||||
action.setSourceId(rs.getString(SOURCE_ID));
|
||||
action.setSourceName(rs.getString(SOURCE_NAME));
|
||||
action.setSourceType(component);
|
||||
|
||||
// get the component details if appropriate
|
||||
|
@ -778,12 +786,12 @@ public class StandardActionDAO implements ActionDAO {
|
|||
|
||||
// ensure results
|
||||
if (rs.next()) {
|
||||
final Component sourceComponent = Component.valueOf(rs.getString("SOURCE_TYPE"));
|
||||
final Component sourceComponent = Component.valueOf(rs.getString(SOURCE_TYPE));
|
||||
final Component destinationComponent = Component.valueOf(rs.getString("DESTINATION_TYPE"));
|
||||
|
||||
connectionDetails = new FlowChangeConnectDetails();
|
||||
connectionDetails.setSourceId(rs.getString("SOURCE_ID"));
|
||||
connectionDetails.setSourceName(rs.getString("SOURCE_NAME"));
|
||||
connectionDetails.setSourceId(rs.getString(SOURCE_ID));
|
||||
connectionDetails.setSourceName(rs.getString(SOURCE_NAME));
|
||||
connectionDetails.setSourceType(sourceComponent);
|
||||
connectionDetails.setRelationship(rs.getString("RELATIONSHIP"));
|
||||
connectionDetails.setDestinationId(rs.getString("DESTINATION_ID"));
|
||||
|
@ -904,8 +912,8 @@ public class StandardActionDAO implements ActionDAO {
|
|||
// get the previous value
|
||||
final PreviousValue previousValue = new PreviousValue();
|
||||
previousValue.setPreviousValue(rs.getString("VALUE"));
|
||||
previousValue.setTimestamp(new Date(rs.getTimestamp("ACTION_TIMESTAMP").getTime()));
|
||||
previousValue.setUserIdentity(rs.getString("IDENTITY"));
|
||||
previousValue.setTimestamp(new Date(rs.getTimestamp(ACTION_TIMESTAMP).getTime()));
|
||||
previousValue.setUserIdentity(rs.getString(IDENTITY));
|
||||
previousValues.add(previousValue);
|
||||
}
|
||||
} catch (SQLException sqle) {
|
||||
|
@ -927,13 +935,13 @@ public class StandardActionDAO implements ActionDAO {
|
|||
// -----------------
|
||||
|
||||
// create the move delete statement
|
||||
statement = connection.prepareStatement(String.format(DELETE_SPECIFIC_ACTIONS, "PROCESSOR_DETAILS", "ACTION_ID"));
|
||||
statement = connection.prepareStatement(String.format(DELETE_SPECIFIC_ACTIONS, "PROCESSOR_DETAILS", ACTION_ID));
|
||||
statement.setTimestamp(1, new java.sql.Timestamp(endDate.getTime()));
|
||||
statement.executeUpdate();
|
||||
statement.close();
|
||||
|
||||
// create the move delete statement
|
||||
statement = connection.prepareStatement(String.format(DELETE_SPECIFIC_ACTIONS, "REMOTE_PROCESS_GROUP_DETAILS", "ACTION_ID"));
|
||||
statement = connection.prepareStatement(String.format(DELETE_SPECIFIC_ACTIONS, "REMOTE_PROCESS_GROUP_DETAILS", ACTION_ID));
|
||||
statement.setTimestamp(1, new java.sql.Timestamp(endDate.getTime()));
|
||||
statement.executeUpdate();
|
||||
statement.close();
|
||||
|
@ -942,25 +950,25 @@ public class StandardActionDAO implements ActionDAO {
|
|||
// action details
|
||||
// --------------
|
||||
// create the move delete statement
|
||||
statement = connection.prepareStatement(String.format(DELETE_SPECIFIC_ACTIONS, "MOVE_DETAILS", "ACTION_ID"));
|
||||
statement = connection.prepareStatement(String.format(DELETE_SPECIFIC_ACTIONS, "MOVE_DETAILS", ACTION_ID));
|
||||
statement.setTimestamp(1, new java.sql.Timestamp(endDate.getTime()));
|
||||
statement.executeUpdate();
|
||||
statement.close();
|
||||
|
||||
// create the configure delete statement
|
||||
statement = connection.prepareStatement(String.format(DELETE_SPECIFIC_ACTIONS, "CONFIGURE_DETAILS", "ACTION_ID"));
|
||||
statement = connection.prepareStatement(String.format(DELETE_SPECIFIC_ACTIONS, "CONFIGURE_DETAILS", ACTION_ID));
|
||||
statement.setTimestamp(1, new java.sql.Timestamp(endDate.getTime()));
|
||||
statement.executeUpdate();
|
||||
statement.close();
|
||||
|
||||
// create the connect delete statement
|
||||
statement = connection.prepareStatement(String.format(DELETE_SPECIFIC_ACTIONS, "CONNECT_DETAILS", "ACTION_ID"));
|
||||
statement = connection.prepareStatement(String.format(DELETE_SPECIFIC_ACTIONS, "CONNECT_DETAILS", ACTION_ID));
|
||||
statement.setTimestamp(1, new java.sql.Timestamp(endDate.getTime()));
|
||||
statement.executeUpdate();
|
||||
statement.close();
|
||||
|
||||
// create the relationship delete statement
|
||||
statement = connection.prepareStatement(String.format(DELETE_SPECIFIC_ACTIONS, "PURGE_DETAILS", "ACTION_ID"));
|
||||
statement = connection.prepareStatement(String.format(DELETE_SPECIFIC_ACTIONS, "PURGE_DETAILS", ACTION_ID));
|
||||
statement.setTimestamp(1, new java.sql.Timestamp(endDate.getTime()));
|
||||
statement.executeUpdate();
|
||||
statement.close();
|
||||
|
@ -979,4 +987,20 @@ public class StandardActionDAO implements ActionDAO {
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void deletePreviousValues(String propertyName, String componentId) {
|
||||
PreparedStatement statement = null;
|
||||
try {
|
||||
statement = connection.prepareStatement(DELETE_PREVIOUS_VALUES);
|
||||
statement.setString(1, propertyName);
|
||||
statement.setString(2, componentId);
|
||||
statement.executeUpdate();
|
||||
statement.close();
|
||||
} catch (SQLException sqle) {
|
||||
throw new DataAccessException(sqle);
|
||||
} finally {
|
||||
RepositoryUtils.closeQuietly(statement);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -46,6 +46,14 @@ public interface AuditService {
|
|||
*/
|
||||
Map<String, List<PreviousValue>> getPreviousValues(String componentId);
|
||||
|
||||
/**
|
||||
* Deletes the history of a component's property.
|
||||
*
|
||||
* @param propertyName the name of the property
|
||||
* @param componentId to delete previous values of
|
||||
*/
|
||||
void deletePreviousValues(String propertyName, String componentId);
|
||||
|
||||
/**
|
||||
* Get the actions within the given date range.
|
||||
*
|
||||
|
|
|
@ -0,0 +1,44 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.admin.service.action;
|
||||
|
||||
import org.apache.nifi.admin.dao.ActionDAO;
|
||||
import org.apache.nifi.admin.dao.DAOFactory;
|
||||
|
||||
/**
|
||||
* Purges actions up to a specified end date.
|
||||
*/
|
||||
public class DeletePreviousValues implements AdministrationAction<Void> {
|
||||
|
||||
private final String propertyName;
|
||||
private final String componentId;
|
||||
|
||||
public DeletePreviousValues(String propertyName, String componentId) {
|
||||
this.propertyName = propertyName;
|
||||
this.componentId = componentId;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Void execute(DAOFactory daoFactory) {
|
||||
ActionDAO actionDao = daoFactory.getActionDAO();
|
||||
|
||||
actionDao.deletePreviousValues(propertyName, componentId);
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
}
|
|
@ -21,6 +21,7 @@ import org.apache.nifi.admin.dao.DataAccessException;
|
|||
import org.apache.nifi.admin.service.AdministrationException;
|
||||
import org.apache.nifi.admin.service.AuditService;
|
||||
import org.apache.nifi.admin.service.action.AddActionsAction;
|
||||
import org.apache.nifi.admin.service.action.DeletePreviousValues;
|
||||
import org.apache.nifi.admin.service.action.GetActionAction;
|
||||
import org.apache.nifi.admin.service.action.GetActionsAction;
|
||||
import org.apache.nifi.admin.service.action.GetPreviousValues;
|
||||
|
@ -31,8 +32,6 @@ import org.apache.nifi.admin.service.transaction.TransactionException;
|
|||
import org.apache.nifi.history.History;
|
||||
import org.apache.nifi.history.HistoryQuery;
|
||||
import org.apache.nifi.history.PreviousValue;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Collection;
|
||||
|
@ -46,8 +45,6 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
|
|||
*/
|
||||
public class StandardAuditService implements AuditService {
|
||||
|
||||
private static final Logger logger = LoggerFactory.getLogger(StandardAuditService.class);
|
||||
|
||||
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
|
||||
private final ReentrantReadWriteLock.ReadLock readLock = lock.readLock();
|
||||
private final ReentrantReadWriteLock.WriteLock writeLock = lock.writeLock();
|
||||
|
@ -111,6 +108,33 @@ public class StandardAuditService implements AuditService {
|
|||
return previousValues;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void deletePreviousValues(String propertyName, String componentId) {
|
||||
Transaction transaction = null;
|
||||
|
||||
readLock.lock();
|
||||
try {
|
||||
// start the transaction
|
||||
transaction = transactionBuilder.start();
|
||||
|
||||
// seed the accounts
|
||||
DeletePreviousValues deleteAction = new DeletePreviousValues(propertyName, componentId);
|
||||
transaction.execute(deleteAction);
|
||||
|
||||
// commit the transaction
|
||||
transaction.commit();
|
||||
} catch (TransactionException | DataAccessException te) {
|
||||
rollback(transaction);
|
||||
throw new AdministrationException(te);
|
||||
} catch (Throwable t) {
|
||||
rollback(transaction);
|
||||
throw t;
|
||||
} finally {
|
||||
closeQuietly(transaction);
|
||||
readLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public History getActions(HistoryQuery query) {
|
||||
Transaction transaction = null;
|
||||
|
|
|
@ -38,6 +38,10 @@ public class NopAuditService implements AuditService {
|
|||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void deletePreviousValues(String propertyName, String componentId) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public History getActions(final HistoryQuery actionQuery) {
|
||||
return null;
|
||||
|
|
|
@ -152,6 +152,9 @@ public class ControllerServiceAuditor extends NiFiAuditor {
|
|||
final PropertyDescriptor propertyDescriptor = controllerService.getPropertyDescriptor(property);
|
||||
// Evaluate both Property Descriptor status and whether the client requested a new Sensitive Dynamic Property
|
||||
if (propertyDescriptor != null && (propertyDescriptor.isSensitive() || sensitiveDynamicPropertyNames.contains(property))) {
|
||||
// Delete previous property history
|
||||
deletePreviousValues(propertyDescriptor.getName(), controllerService.getIdentifier(), logger);
|
||||
|
||||
if (newValue != null) {
|
||||
newValue = SENSITIVE_VALUE_PLACEHOLDER;
|
||||
}
|
||||
|
|
|
@ -65,7 +65,18 @@ public abstract class NiFiAuditor {
|
|||
try {
|
||||
auditService.addActions(actions);
|
||||
} catch (Throwable t) {
|
||||
logger.warn("Unable to record actions: " + t.getMessage());
|
||||
logger.warn("Unable to record actions: ", t);
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.warn(StringUtils.EMPTY, t);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
protected void deletePreviousValues(String propertyName, String componentId, Logger logger) {
|
||||
try {
|
||||
auditService.deletePreviousValues(propertyName, componentId);
|
||||
} catch (Throwable t) {
|
||||
logger.warn("Unable to delete property history", t);
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.warn(StringUtils.EMPTY, t);
|
||||
}
|
||||
|
@ -88,7 +99,7 @@ public abstract class NiFiAuditor {
|
|||
moveDetails.setGroupId(newGroup.getIdentifier());
|
||||
moveDetails.setGroup(newGroup.getName());
|
||||
} else {
|
||||
logger.warn(String.format("Unable to record move action because old (%s) and new (%s) groups could not be found.", previousGroupId, newGroupId));
|
||||
logger.warn("Unable to record move action because old [{}] and new [{}] groups could not be found.", previousGroupId, newGroupId);
|
||||
}
|
||||
|
||||
return moveDetails;
|
||||
|
|
|
@ -172,6 +172,9 @@ public class ProcessorAuditor extends NiFiAuditor {
|
|||
final PropertyDescriptor propertyDescriptor = processor.getPropertyDescriptor(property);
|
||||
// Evaluate both Property Descriptor status and whether the client requested a new Sensitive Dynamic Property
|
||||
if (propertyDescriptor != null && (propertyDescriptor.isSensitive() || sensitiveDynamicPropertyNames.contains(property))) {
|
||||
// Delete previous property history
|
||||
deletePreviousValues(propertyDescriptor.getName(), processor.getIdentifier(), logger);
|
||||
|
||||
if (newValue != null) {
|
||||
newValue = SENSITIVE_VALUE_PLACEHOLDER;
|
||||
}
|
||||
|
|
|
@ -145,6 +145,9 @@ public class ReportingTaskAuditor extends NiFiAuditor {
|
|||
final PropertyDescriptor propertyDescriptor = reportingTask.getPropertyDescriptor(property);
|
||||
// Evaluate both Property Descriptor status and whether the client requested a new Sensitive Dynamic Property
|
||||
if (propertyDescriptor != null && (propertyDescriptor.isSensitive() || sensitiveDynamicPropertyNames.contains(property))) {
|
||||
// Delete previous property history
|
||||
deletePreviousValues(propertyDescriptor.getName(), reportingTask.getIdentifier(), logger);
|
||||
|
||||
if (newValue != null) {
|
||||
newValue = SENSITIVE_VALUE_PLACEHOLDER;
|
||||
}
|
||||
|
|
|
@ -55,8 +55,6 @@ import org.apache.nifi.web.api.dto.ProcessorDTO;
|
|||
import org.apache.nifi.web.dao.ComponentStateDAO;
|
||||
import org.apache.nifi.web.dao.ProcessorDAO;
|
||||
import org.quartz.CronExpression;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.net.URL;
|
||||
import java.text.ParseException;
|
||||
|
@ -73,7 +71,6 @@ import java.util.stream.Collectors;
|
|||
|
||||
public class StandardProcessorDAO extends ComponentDAO implements ProcessorDAO {
|
||||
|
||||
private static final Logger logger = LoggerFactory.getLogger(StandardProcessorDAO.class);
|
||||
private FlowController flowController;
|
||||
private ComponentStateDAO componentStateDAO;
|
||||
|
||||
|
|
|
@ -0,0 +1,363 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.audit;
|
||||
|
||||
import org.apache.nifi.action.Action;
|
||||
import org.apache.nifi.action.Component;
|
||||
import org.apache.nifi.action.Operation;
|
||||
import org.apache.nifi.action.details.ActionDetails;
|
||||
import org.apache.nifi.action.details.FlowChangeConfigureDetails;
|
||||
import org.apache.nifi.admin.service.AuditService;
|
||||
import org.apache.nifi.authorization.user.NiFiUser;
|
||||
import org.apache.nifi.authorization.user.NiFiUserDetails;
|
||||
import org.apache.nifi.authorization.user.StandardNiFiUser;
|
||||
import org.apache.nifi.bundle.Bundle;
|
||||
import org.apache.nifi.bundle.BundleCoordinate;
|
||||
import org.apache.nifi.bundle.BundleDetails;
|
||||
import org.apache.nifi.components.PropertyDescriptor;
|
||||
import org.apache.nifi.components.state.StateManager;
|
||||
import org.apache.nifi.components.state.StateManagerProvider;
|
||||
import org.apache.nifi.controller.FlowController;
|
||||
import org.apache.nifi.controller.ProcessorNode;
|
||||
import org.apache.nifi.controller.flow.FlowManager;
|
||||
import org.apache.nifi.controller.service.ControllerServiceProvider;
|
||||
import org.apache.nifi.groups.ProcessGroup;
|
||||
import org.apache.nifi.nar.ExtensionManager;
|
||||
import org.apache.nifi.processor.Processor;
|
||||
import org.apache.nifi.web.api.dto.BundleDTO;
|
||||
import org.apache.nifi.web.api.dto.ProcessorConfigDTO;
|
||||
import org.apache.nifi.web.api.dto.ProcessorDTO;
|
||||
import org.apache.nifi.web.dao.ProcessGroupDAO;
|
||||
import org.apache.nifi.web.dao.impl.StandardProcessGroupDAO;
|
||||
import org.apache.nifi.web.dao.impl.StandardProcessorDAO;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.extension.ExtendWith;
|
||||
import org.mockito.ArgumentCaptor;
|
||||
import org.mockito.Captor;
|
||||
import org.mockito.Mock;
|
||||
import org.mockito.junit.jupiter.MockitoExtension;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.context.annotation.EnableAspectJAutoProxy;
|
||||
import org.springframework.security.core.Authentication;
|
||||
import org.springframework.security.core.context.SecurityContextHolder;
|
||||
import org.springframework.test.context.ContextConfiguration;
|
||||
import org.springframework.test.context.junit.jupiter.SpringExtension;
|
||||
|
||||
import java.io.File;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
|
||||
import static org.junit.jupiter.api.Assertions.assertNotEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertNotNull;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
import static org.mockito.ArgumentMatchers.any;
|
||||
import static org.mockito.ArgumentMatchers.anyString;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.verify;
|
||||
import static org.mockito.Mockito.verifyNoInteractions;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
|
||||
@ExtendWith({SpringExtension.class, MockitoExtension.class})
|
||||
@ContextConfiguration(classes = {TestProcessorAuditor.AuditorConfiguration.class})
|
||||
class TestProcessorAuditor {
|
||||
|
||||
private static final String PROC_1 = "processor1";
|
||||
private static final String PN_SOURCE_NAME = "sourceName1";
|
||||
private static final String PN_ID = "processorNodeId1";
|
||||
private static final String GROUP_ID = "group-1";
|
||||
private static final String USER_IDENTITY = "user-id";
|
||||
private static final BundleCoordinate BUNDLE_COORDINATE = new BundleCoordinate("org.apache.nifi", "nifi-processor-nar", "0.0.0");
|
||||
|
||||
@Autowired
|
||||
private StandardProcessorDAO processorDao;
|
||||
@Autowired
|
||||
private ProcessorAuditor processorAuditor;
|
||||
|
||||
@Mock
|
||||
private AuditService auditService;
|
||||
@Mock
|
||||
private Authentication authentication;
|
||||
@Mock
|
||||
private Processor processor;
|
||||
@Mock
|
||||
private ExtensionManager extensionManager;
|
||||
@Mock
|
||||
private FlowController flowController;
|
||||
@Mock
|
||||
private FlowManager flowManager;
|
||||
@Mock
|
||||
private ProcessGroup processGroup;
|
||||
@Mock
|
||||
private ProcessorNode mockProcessorNode;
|
||||
@Mock
|
||||
private StateManagerProvider mockStateManagerProvider;
|
||||
@Mock
|
||||
private StateManager mockStateManager;
|
||||
@Mock
|
||||
private NiFiUserDetails userDetail;
|
||||
|
||||
@Captor
|
||||
private ArgumentCaptor<List<Action>> actionsArgumentCaptor;
|
||||
|
||||
@BeforeEach
|
||||
void init() {
|
||||
SecurityContextHolder.getContext().setAuthentication(authentication);
|
||||
final NiFiUser user = new StandardNiFiUser.Builder().identity(USER_IDENTITY).build();
|
||||
userDetail = new NiFiUserDetails(user);
|
||||
when(authentication.getPrincipal()).thenReturn(userDetail);
|
||||
|
||||
when(flowController.getFlowManager()).thenReturn(flowManager);
|
||||
|
||||
processorDao.setFlowController(flowController);
|
||||
processorAuditor.setAuditService(auditService);
|
||||
}
|
||||
|
||||
@Test
|
||||
void testCreateProcessorAdvice() {
|
||||
final ProcessorDTO processorDto = getProcessorDto();
|
||||
|
||||
when(mockProcessorNode.getIdentifier()).thenReturn(PN_ID);
|
||||
when(mockProcessorNode.getName()).thenReturn(PN_SOURCE_NAME);
|
||||
when(mockProcessorNode.getProcessor()).thenReturn(processor);
|
||||
when(mockProcessorNode.getCanonicalClassName()).thenReturn(mockProcessorNode.getClass().getCanonicalName());
|
||||
|
||||
when(processor.getIdentifier()).thenReturn(PN_ID);
|
||||
|
||||
when(flowController.getExtensionManager()).thenReturn(extensionManager);
|
||||
when(flowController.getControllerServiceProvider()).thenReturn(mock(ControllerServiceProvider.class));
|
||||
when(flowController.getStateManagerProvider()).thenReturn(mockStateManagerProvider);
|
||||
|
||||
when(flowManager.getGroup(GROUP_ID)).thenReturn(processGroup);
|
||||
when(flowManager.createProcessor(anyString(), anyString(), any())).thenReturn(mockProcessorNode);
|
||||
|
||||
final Bundle bundle = getBundle();
|
||||
|
||||
when(extensionManager.getBundle(any(BundleCoordinate.class))).thenReturn(bundle);
|
||||
when(extensionManager.getBundles(anyString())).thenReturn(Collections.singletonList(bundle));
|
||||
|
||||
|
||||
when(mockStateManagerProvider.getStateManager(PN_ID)).thenReturn(mockStateManager);
|
||||
|
||||
final ProcessorNode processor = processorDao.createProcessor(GROUP_ID, processorDto);
|
||||
|
||||
assertNotNull(processor);
|
||||
verify(auditService).addActions(actionsArgumentCaptor.capture());
|
||||
final List<Action> actions = actionsArgumentCaptor.getValue();
|
||||
assertActionFound(actions, Operation.Add);
|
||||
}
|
||||
|
||||
@Test
|
||||
void testRemoveProcessorAdvice() {
|
||||
when(flowManager.getRootGroup()).thenReturn(processGroup);
|
||||
|
||||
ProcessorNode mockProcessorNode = mock(ProcessorNode.class);
|
||||
when(mockProcessorNode.getIdentifier()).thenReturn(PN_ID);
|
||||
when(mockProcessorNode.getName()).thenReturn(PN_SOURCE_NAME);
|
||||
when(mockProcessorNode.getProcessGroup()).thenReturn(processGroup);
|
||||
|
||||
when(processGroup.findProcessor(PN_ID)).thenReturn(mockProcessorNode);
|
||||
|
||||
processorDao.deleteProcessor(PN_ID);
|
||||
|
||||
verify(auditService).addActions(actionsArgumentCaptor.capture());
|
||||
final List<Action> actions = actionsArgumentCaptor.getValue();
|
||||
assertActionFound(actions, Operation.Remove);
|
||||
}
|
||||
|
||||
@Test
|
||||
void testUpdateProcessorAdvice() {
|
||||
when(flowController.getExtensionManager()).thenReturn(extensionManager);
|
||||
when(flowManager.getRootGroup()).thenReturn(processGroup);
|
||||
|
||||
final Bundle bundle = getBundle();
|
||||
|
||||
when(extensionManager.getBundle(any(BundleCoordinate.class))).thenReturn(bundle);
|
||||
when(extensionManager.getBundles(anyString())).thenReturn(Collections.singletonList(bundle));
|
||||
|
||||
when(mockProcessorNode.getIdentifier()).thenReturn(PN_ID);
|
||||
when(mockProcessorNode.getName()).thenReturn(PN_SOURCE_NAME);
|
||||
when(mockProcessorNode.getBundleCoordinate()).thenReturn(BUNDLE_COORDINATE);
|
||||
when(mockProcessorNode.getCanonicalClassName()).thenReturn(ProcessorNode.class.getCanonicalName());
|
||||
when(mockProcessorNode.getProcessGroup()).thenReturn(processGroup);
|
||||
|
||||
when(processGroup.findProcessor(PN_ID)).thenReturn(mockProcessorNode);
|
||||
|
||||
final ProcessorDTO processorDto = getProcessorDto();
|
||||
ProcessorConfigDTO config = new ProcessorConfigDTO();
|
||||
config.setComments("comment1");
|
||||
config.setRunDurationMillis(100L);
|
||||
|
||||
processorDto.setConfig(config);
|
||||
processorDto.setId(PN_ID);
|
||||
|
||||
processorDao.updateProcessor(processorDto);
|
||||
|
||||
verify(auditService).addActions(actionsArgumentCaptor.capture());
|
||||
final List<Action> actions = actionsArgumentCaptor.getValue();
|
||||
final Action action = assertActionFound(actions, Operation.Configure);
|
||||
|
||||
final ActionDetails actionDetails = action.getActionDetails();
|
||||
assertUpdateActionDetailsFound(actionDetails);
|
||||
}
|
||||
|
||||
@Test
|
||||
void testSensitivePropertyDeletesHistory() {
|
||||
when(flowController.getExtensionManager()).thenReturn(extensionManager);
|
||||
when(flowManager.getRootGroup()).thenReturn(processGroup);
|
||||
|
||||
final Bundle bundle = getBundle();
|
||||
|
||||
when(extensionManager.getBundle(any(BundleCoordinate.class))).thenReturn(bundle);
|
||||
when(extensionManager.getBundles(anyString())).thenReturn(Collections.singletonList(bundle));
|
||||
|
||||
when(mockProcessorNode.getIdentifier()).thenReturn(PN_ID);
|
||||
when(mockProcessorNode.getName()).thenReturn(PN_SOURCE_NAME);
|
||||
when(mockProcessorNode.getBundleCoordinate()).thenReturn(BUNDLE_COORDINATE);
|
||||
when(mockProcessorNode.getCanonicalClassName()).thenReturn(ProcessorNode.class.getCanonicalName());
|
||||
when(mockProcessorNode.getProcessGroup()).thenReturn(processGroup);
|
||||
|
||||
final String propertyName = "sensitive-property-descriptor-1";
|
||||
|
||||
PropertyDescriptor propertyDescriptor = new PropertyDescriptor.Builder()
|
||||
.name(propertyName)
|
||||
.sensitive(true)
|
||||
.build();
|
||||
|
||||
when(mockProcessorNode.getPropertyDescriptor("dynamicSensitiveProperty1")).thenReturn(propertyDescriptor);
|
||||
|
||||
when(processGroup.findProcessor(PN_ID)).thenReturn(mockProcessorNode);
|
||||
|
||||
final ProcessorDTO processorDto = getProcessorDto();
|
||||
ProcessorConfigDTO config = new ProcessorConfigDTO();
|
||||
config.setProperties(Collections.singletonMap("dynamicSensitiveProperty1", "asd"));
|
||||
config.setSensitiveDynamicPropertyNames(Collections.singleton("dynamicSensitiveProperty1"));
|
||||
config.setRunDurationMillis(100L);
|
||||
|
||||
processorDto.setConfig(config);
|
||||
processorDto.setId(PN_ID);
|
||||
|
||||
processorDao.updateProcessor(processorDto);
|
||||
|
||||
ArgumentCaptor<String> propertyNameCaptor = ArgumentCaptor.forClass(String.class);
|
||||
ArgumentCaptor<String> componentIdCaptor = ArgumentCaptor.forClass(String.class);
|
||||
|
||||
verify(auditService).deletePreviousValues(propertyNameCaptor.capture(), componentIdCaptor.capture());
|
||||
|
||||
assertEquals(propertyName, propertyNameCaptor.getValue());
|
||||
assertEquals(PN_ID, componentIdCaptor.getValue());
|
||||
}
|
||||
|
||||
@Test
|
||||
void testUpdateProcessorAdviceProcessorUnchanged() {
|
||||
when(flowController.getExtensionManager()).thenReturn(extensionManager);
|
||||
when(flowManager.getRootGroup()).thenReturn(processGroup);
|
||||
|
||||
final Bundle bundle = getBundle();
|
||||
|
||||
when(extensionManager.getBundle(any(BundleCoordinate.class))).thenReturn(bundle);
|
||||
when(extensionManager.getBundles(anyString())).thenReturn(Collections.singletonList(bundle));
|
||||
|
||||
when(mockProcessorNode.getIdentifier()).thenReturn(PN_ID);
|
||||
when(mockProcessorNode.getBundleCoordinate()).thenReturn(BUNDLE_COORDINATE);
|
||||
when(mockProcessorNode.getCanonicalClassName()).thenReturn(ProcessorNode.class.getCanonicalName());
|
||||
when(mockProcessorNode.getProcessGroup()).thenReturn(processGroup);
|
||||
|
||||
when(processGroup.findProcessor(PN_ID)).thenReturn(mockProcessorNode);
|
||||
|
||||
final ProcessorDTO processorDto = getProcessorDto();
|
||||
processorDto.setId(PN_ID);
|
||||
|
||||
processorDao.updateProcessor(processorDto);
|
||||
|
||||
verifyNoInteractions(auditService);
|
||||
}
|
||||
|
||||
private void assertUpdateActionDetailsFound(final ActionDetails actionDetails) {
|
||||
assertInstanceOf(FlowChangeConfigureDetails.class, actionDetails);
|
||||
final FlowChangeConfigureDetails flowChangeConfigureDetails = (FlowChangeConfigureDetails) actionDetails;
|
||||
|
||||
assertEquals("Comments", flowChangeConfigureDetails.getName());
|
||||
assertNotEquals("Comments", flowChangeConfigureDetails.getPreviousValue());
|
||||
}
|
||||
|
||||
private Action assertActionFound(final List<Action> actions, final Operation operation) {
|
||||
assertNotNull(actions);
|
||||
|
||||
final Optional<Action> actionFound = actions.stream().findFirst();
|
||||
assertTrue(actionFound.isPresent());
|
||||
|
||||
final Action action = actionFound.get();
|
||||
assertEquals(USER_IDENTITY, action.getUserIdentity());
|
||||
assertEquals(operation, action.getOperation());
|
||||
assertEquals(PN_ID, action.getSourceId());
|
||||
assertEquals(PN_SOURCE_NAME, action.getSourceName());
|
||||
assertEquals(Component.Processor, action.getSourceType());
|
||||
assertNotNull(action.getTimestamp());
|
||||
|
||||
return action;
|
||||
}
|
||||
|
||||
private ProcessorDTO getProcessorDto() {
|
||||
final ProcessorDTO processorDto = new ProcessorDTO();
|
||||
processorDto.setId(PROC_1);
|
||||
processorDto.setType("Processor");
|
||||
final BundleDTO bundleDto = new BundleDTO();
|
||||
bundleDto.setArtifact(BUNDLE_COORDINATE.getId());
|
||||
bundleDto.setGroup(BUNDLE_COORDINATE.getGroup());
|
||||
bundleDto.setVersion(BUNDLE_COORDINATE.getVersion());
|
||||
processorDto.setBundle(bundleDto);
|
||||
processorDto.setExtensionMissing(false);
|
||||
processorDto.setStyle(Collections.emptyMap());
|
||||
|
||||
return processorDto;
|
||||
}
|
||||
|
||||
private Bundle getBundle() {
|
||||
final BundleDetails bundleDetails = new BundleDetails.Builder()
|
||||
.coordinate(BUNDLE_COORDINATE)
|
||||
.workingDir(new File("."))
|
||||
.build();
|
||||
return new Bundle(bundleDetails, this.getClass().getClassLoader());
|
||||
}
|
||||
|
||||
@Configuration
|
||||
@EnableAspectJAutoProxy(proxyTargetClass = true)
|
||||
public static class AuditorConfiguration {
|
||||
|
||||
@Bean
|
||||
public ProcessorAuditor processorAuditor() {
|
||||
return new ProcessorAuditor();
|
||||
}
|
||||
|
||||
@Bean
|
||||
public StandardProcessorDAO processorDAO() {
|
||||
return new StandardProcessorDAO();
|
||||
}
|
||||
|
||||
@Bean
|
||||
public ProcessGroupDAO processGroupDAO() {
|
||||
return new StandardProcessGroupDAO();
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue