NIFI-986 Refactoring of action classes from nifi-user-actions to have interfaces in nifi-api, and adding getFlowChanges to EventAccess

- Fixing empty java docs and adding sort by id asc to the history query
- Changing userDn to userIdentity in Action and FlowChangeAction
- Modifying NiFiAuditor to always save events locally, and implementing getFlowChanges for ClusteredEventAccess
This commit is contained in:
Bryan Bende 2015-09-25 13:35:57 -04:00
parent c4f0cb1c6c
commit 5cc2b04b91
52 changed files with 731 additions and 320 deletions

View File

@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.action;
import org.apache.nifi.action.component.details.ComponentDetails;
import org.apache.nifi.action.details.ActionDetails;
import java.io.Serializable;
import java.util.Date;
/**
* An action taken on the flow by a user.
*/
public interface Action extends Serializable {
Integer getId();
Date getTimestamp();
String getUserIdentity();
String getUserName();
String getSourceId();
String getSourceName();
Component getSourceType();
ComponentDetails getComponentDetails();
Operation getOperation();
ActionDetails getActionDetails();
}

View File

@ -19,8 +19,8 @@ package org.apache.nifi.action.component.details;
import java.io.Serializable;
/**
*
* Base interface for providing component details to an Action.
*/
public abstract class ComponentDetails implements Serializable {
public interface ComponentDetails extends Serializable {
}

View File

@ -17,18 +17,10 @@
package org.apache.nifi.action.component.details;
/**
*
* Provides details of an extension on an Action.
*/
public class ExtensionDetails extends ComponentDetails {
public interface ExtensionDetails extends ComponentDetails {
private String type;
public String getType() {
return type;
}
public void setType(String type) {
this.type = type;
}
String getType();
}

View File

@ -17,18 +17,10 @@
package org.apache.nifi.action.component.details;
/**
*
* Provides details of a remote process group to an Action.
*/
public class RemoteProcessGroupDetails extends ComponentDetails {
public interface RemoteProcessGroupDetails extends ComponentDetails {
private String uri;
public String getUri() {
return uri;
}
public void setUri(String uri) {
this.uri = uri;
}
String getUri();
}

View File

@ -19,8 +19,8 @@ package org.apache.nifi.action.details;
import java.io.Serializable;
/**
*
* Provides additional details about a given action.
*/
public abstract class ActionDetails implements Serializable {
public interface ActionDetails extends Serializable {
}

View File

@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.action.details;
/**
* Provides details about a configure action.
*/
public interface ConfigureDetails extends ActionDetails {
String getName();
String getPreviousValue();
String getValue();
}

View File

@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.action.details;
import org.apache.nifi.action.Component;
/**
* Provides details about a connect action.
*/
public interface ConnectDetails extends ActionDetails {
String getSourceId();
String getSourceName();
Component getSourceType();
String getDestinationId();
String getDestinationName();
Component getDestinationType();
String getRelationship();
}

View File

@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.action.details;
/**
* Provides details about a move action.
*/
public interface MoveDetails extends ActionDetails {
String getGroup();
String getGroupId();
String getPreviousGroup();
String getPreviousGroupId();
}

View File

@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.action.details;
import java.util.Date;
/**
* Provides details about a purge action.
*/
public interface PurgeDetails extends ActionDetails {
Date getEndDate();
}

View File

@ -16,6 +16,7 @@
*/
package org.apache.nifi.reporting;
import org.apache.nifi.action.Action;
import org.apache.nifi.controller.status.ProcessGroupStatus;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.ProvenanceEventRepository;
@ -46,4 +47,16 @@ public interface EventAccess {
* @return the Provenance Event Repository
*/
ProvenanceEventRepository getProvenanceRepository();
/**
* Obtains flow changes starting with (and including) the given action ID. If no action
* exists with that ID, the first action to be returned will have an ID greater than
* <code>firstActionId</code>.
*
* @param firstActionId the id of the first action to obtain
* @param maxActions the maximum number of actions to obtain
* @return actions with ids greater than or equal to firstActionID, up to the max number of actions
*/
List<Action> getFlowChanges(int firstActionId, final int maxActions);
}

View File

@ -20,6 +20,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.nifi.action.Action;
import org.apache.nifi.controller.status.ProcessGroupStatus;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.ProvenanceEventRepository;
@ -29,6 +30,7 @@ public class MockEventAccess implements EventAccess {
private ProcessGroupStatus processGroupStatus;
private final List<ProvenanceEventRecord> provenanceRecords = new ArrayList<>();
private final List<Action> flowChanges = new ArrayList<>();
public void setProcessGroupStatus(final ProcessGroupStatus status) {
this.processGroupStatus = status;
@ -67,4 +69,29 @@ public class MockEventAccess implements EventAccess {
public ProvenanceEventRepository getProvenanceRepository() {
return null;
}
@Override
public List<Action> getFlowChanges(int firstActionId, int maxActions) {
if (firstActionId < 0 || maxActions < 1) {
throw new IllegalArgumentException();
}
final List<Action> actions = new ArrayList<>();
for (final Action action : flowChanges) {
if (action.getId() >= firstActionId) {
actions.add(action);
if (actions.size() >= maxActions) {
return actions;
}
}
}
return actions;
}
public void addFlowChange(final Action action) {
this.flowChanges.add(action);
}
}

View File

@ -16,13 +16,14 @@
*/
package org.apache.nifi.admin.dao;
import org.apache.nifi.action.Action;
import org.apache.nifi.history.History;
import org.apache.nifi.history.HistoryQuery;
import org.apache.nifi.history.PreviousValue;
import java.util.Date;
import java.util.List;
import java.util.Map;
import org.apache.nifi.action.Action;
import org.apache.nifi.history.HistoryQuery;
import org.apache.nifi.history.History;
import org.apache.nifi.history.PreviousValue;
/**
* Action data access.
@ -33,9 +34,10 @@ public interface ActionDAO {
* Persists the specified action.
*
* @param action to persist
* @return the created Action with it's id
* @throws DataAccessException if unable to persist
*/
void createAction(Action action) throws DataAccessException;
Action createAction(Action action) throws DataAccessException;
/**
* Finds all actions that meet the specified criteria.

View File

@ -80,8 +80,9 @@ public interface UserDAO {
* Creates a new user based off the specified NiFiUser.
*
* @param user to create
* @return the created user with it's id
*/
void createUser(NiFiUser user) throws DataAccessException;
NiFiUser createUser(NiFiUser user) throws DataAccessException;
/**
* Updates the specified NiFiUser.

View File

@ -30,13 +30,20 @@ import java.util.List;
import java.util.Map;
import org.apache.nifi.action.Action;
import org.apache.nifi.action.Component;
import org.apache.nifi.action.FlowChangeAction;
import org.apache.nifi.action.Operation;
import org.apache.nifi.action.component.details.ComponentDetails;
import org.apache.nifi.action.component.details.ExtensionDetails;
import org.apache.nifi.action.component.details.FlowChangeExtensionDetails;
import org.apache.nifi.action.component.details.FlowChangeRemoteProcessGroupDetails;
import org.apache.nifi.action.component.details.RemoteProcessGroupDetails;
import org.apache.nifi.action.details.ActionDetails;
import org.apache.nifi.action.details.ConfigureDetails;
import org.apache.nifi.action.details.ConnectDetails;
import org.apache.nifi.action.details.FlowChangeConfigureDetails;
import org.apache.nifi.action.details.FlowChangeConnectDetails;
import org.apache.nifi.action.details.FlowChangeMoveDetails;
import org.apache.nifi.action.details.FlowChangePurgeDetails;
import org.apache.nifi.action.details.MoveDetails;
import org.apache.nifi.action.details.PurgeDetails;
import org.apache.nifi.admin.RepositoryUtils;
@ -195,8 +202,8 @@ public class StandardActionDAO implements ActionDAO {
}
@Override
public void createAction(Action action) throws DataAccessException {
if (action.getUserDn() == null) {
public Action createAction(Action action) throws DataAccessException {
if (action.getUserIdentity() == null) {
throw new IllegalArgumentException("User cannot be null.");
}
@ -209,7 +216,7 @@ public class StandardActionDAO implements ActionDAO {
try {
// obtain a statement to insert to the action table
statement = connection.prepareStatement(INSERT_ACTION, Statement.RETURN_GENERATED_KEYS);
statement.setString(1, StringUtils.left(action.getUserDn(), 255));
statement.setString(1, StringUtils.left(action.getUserIdentity(), 255));
statement.setString(2, StringUtils.left(action.getUserName(), 100));
statement.setString(3, action.getSourceId());
statement.setString(4, StringUtils.left(action.getSourceName(), 1000));
@ -220,10 +227,21 @@ public class StandardActionDAO implements ActionDAO {
// insert the action
int updateCount = statement.executeUpdate();
final FlowChangeAction createdAction = new FlowChangeAction();
createdAction.setUserIdentity(action.getUserIdentity());
createdAction.setUserName(action.getUserName());
createdAction.setSourceId(action.getSourceId());
createdAction.setSourceName(action.getSourceName());
createdAction.setSourceType(action.getSourceType());
createdAction.setOperation(action.getOperation());
createdAction.setTimestamp(action.getTimestamp());
createdAction.setActionDetails(action.getActionDetails());
createdAction.setComponentDetails(action.getComponentDetails());
// get the action id
rs = statement.getGeneratedKeys();
if (updateCount == 1 && rs.next()) {
action.setId(rs.getInt(1));
createdAction.setId(rs.getInt(1));
} else {
throw new DataAccessException("Unable to insert action.");
}
@ -232,25 +250,26 @@ public class StandardActionDAO implements ActionDAO {
statement.close();
// determine the type of component
ComponentDetails componentDetails = action.getComponentDetails();
if (componentDetails instanceof ExtensionDetails) {
createExtensionDetails(action.getId(), (ExtensionDetails) componentDetails);
} else if (componentDetails instanceof RemoteProcessGroupDetails) {
createRemoteProcessGroupDetails(action.getId(), (RemoteProcessGroupDetails) componentDetails);
ComponentDetails componentDetails = createdAction.getComponentDetails();
if (componentDetails instanceof FlowChangeExtensionDetails) {
createExtensionDetails(createdAction.getId(), (ExtensionDetails) componentDetails);
} else if (componentDetails instanceof FlowChangeRemoteProcessGroupDetails) {
createRemoteProcessGroupDetails(createdAction.getId(), (RemoteProcessGroupDetails) componentDetails);
}
// determine the type of action
ActionDetails details = action.getActionDetails();
if (details instanceof ConnectDetails) {
createConnectDetails(action.getId(), (ConnectDetails) details);
} else if (details instanceof MoveDetails) {
createMoveDetails(action.getId(), (MoveDetails) details);
} else if (details instanceof ConfigureDetails) {
createConfigureDetails(action.getId(), (ConfigureDetails) details);
} else if (details instanceof PurgeDetails) {
createPurgeDetails(action.getId(), (PurgeDetails) details);
ActionDetails details = createdAction.getActionDetails();
if (details instanceof FlowChangeConnectDetails) {
createConnectDetails(createdAction.getId(), (ConnectDetails) details);
} else if (details instanceof FlowChangeMoveDetails) {
createMoveDetails(createdAction.getId(), (MoveDetails) details);
} else if (details instanceof FlowChangeConfigureDetails) {
createConfigureDetails(createdAction.getId(), (ConfigureDetails) details);
} else if (details instanceof FlowChangePurgeDetails) {
createPurgeDetails(createdAction.getId(), (PurgeDetails) details);
}
return createdAction;
} catch (SQLException sqle) {
throw new DataAccessException(sqle);
} finally {
@ -540,9 +559,9 @@ public class StandardActionDAO implements ActionDAO {
final Operation operation = Operation.valueOf(rs.getString("OPERATION"));
final Component component = Component.valueOf(rs.getString("SOURCE_TYPE"));
Action action = new Action();
FlowChangeAction action = new FlowChangeAction();
action.setId(actionId);
action.setUserDn(rs.getString("USER_DN"));
action.setUserIdentity(rs.getString("USER_DN"));
action.setUserName(rs.getString("USER_NAME"));
action.setOperation(Operation.valueOf(rs.getString("OPERATION")));
action.setTimestamp(new Date(rs.getTimestamp("ACTION_TIMESTAMP").getTime()));
@ -597,7 +616,7 @@ public class StandardActionDAO implements ActionDAO {
@Override
public Action getAction(Integer actionId) throws DataAccessException {
Action action = null;
FlowChangeAction action = null;
PreparedStatement statement = null;
ResultSet rs = null;
try {
@ -614,9 +633,9 @@ public class StandardActionDAO implements ActionDAO {
Component component = Component.valueOf(rs.getString("SOURCE_TYPE"));
// populate the action
action = new Action();
action = new FlowChangeAction();
action.setId(rs.getInt("ID"));
action.setUserDn(rs.getString("USER_DN"));
action.setUserIdentity(rs.getString("USER_DN"));
action.setUserName(rs.getString("USER_NAME"));
action.setOperation(operation);
action.setTimestamp(new Date(rs.getTimestamp("ACTION_TIMESTAMP").getTime()));
@ -664,7 +683,7 @@ public class StandardActionDAO implements ActionDAO {
}
private ExtensionDetails getExtensionDetails(Integer actionId) throws DataAccessException {
ExtensionDetails extensionDetails = null;
FlowChangeExtensionDetails extensionDetails = null;
PreparedStatement statement = null;
ResultSet rs = null;
try {
@ -677,7 +696,7 @@ public class StandardActionDAO implements ActionDAO {
// ensure results
if (rs.next()) {
extensionDetails = new ExtensionDetails();
extensionDetails = new FlowChangeExtensionDetails();
extensionDetails.setType(rs.getString("TYPE"));
}
} catch (SQLException sqle) {
@ -691,7 +710,7 @@ public class StandardActionDAO implements ActionDAO {
}
private RemoteProcessGroupDetails getRemoteProcessGroupDetails(Integer actionId) throws DataAccessException {
RemoteProcessGroupDetails remoteProcessGroupDetails = null;
FlowChangeRemoteProcessGroupDetails remoteProcessGroupDetails = null;
PreparedStatement statement = null;
ResultSet rs = null;
try {
@ -704,7 +723,7 @@ public class StandardActionDAO implements ActionDAO {
// ensure results
if (rs.next()) {
remoteProcessGroupDetails = new RemoteProcessGroupDetails();
remoteProcessGroupDetails = new FlowChangeRemoteProcessGroupDetails();
remoteProcessGroupDetails.setUri(rs.getString("URI"));
}
} catch (SQLException sqle) {
@ -718,7 +737,7 @@ public class StandardActionDAO implements ActionDAO {
}
private MoveDetails getMoveDetails(Integer actionId) throws DataAccessException {
MoveDetails moveDetails = null;
FlowChangeMoveDetails moveDetails = null;
PreparedStatement statement = null;
ResultSet rs = null;
try {
@ -731,7 +750,7 @@ public class StandardActionDAO implements ActionDAO {
// ensure results
if (rs.next()) {
moveDetails = new MoveDetails();
moveDetails = new FlowChangeMoveDetails();
moveDetails.setGroupId(rs.getString("GROUP_ID"));
moveDetails.setGroup(rs.getString("GROUP_NAME"));
moveDetails.setPreviousGroupId(rs.getString("PREVIOUS_GROUP_ID"));
@ -748,7 +767,7 @@ public class StandardActionDAO implements ActionDAO {
}
private ConnectDetails getConnectDetails(Integer actionId) throws DataAccessException {
ConnectDetails connectionDetails = null;
FlowChangeConnectDetails connectionDetails = null;
PreparedStatement statement = null;
ResultSet rs = null;
try {
@ -764,7 +783,7 @@ public class StandardActionDAO implements ActionDAO {
final Component sourceComponent = Component.valueOf(rs.getString("SOURCE_TYPE"));
final Component destinationComponent = Component.valueOf(rs.getString("DESTINATION_TYPE"));
connectionDetails = new ConnectDetails();
connectionDetails = new FlowChangeConnectDetails();
connectionDetails.setSourceId(rs.getString("SOURCE_ID"));
connectionDetails.setSourceName(rs.getString("SOURCE_NAME"));
connectionDetails.setSourceType(sourceComponent);
@ -784,7 +803,7 @@ public class StandardActionDAO implements ActionDAO {
}
private ConfigureDetails getConfigureDetails(Integer actionId) throws DataAccessException {
ConfigureDetails configurationDetails = null;
FlowChangeConfigureDetails configurationDetails = null;
PreparedStatement statement = null;
ResultSet rs = null;
try {
@ -797,7 +816,7 @@ public class StandardActionDAO implements ActionDAO {
// ensure results
if (rs.next()) {
configurationDetails = new ConfigureDetails();
configurationDetails = new FlowChangeConfigureDetails();
configurationDetails.setName(rs.getString("NAME"));
configurationDetails.setValue(rs.getString("VALUE"));
configurationDetails.setPreviousValue(rs.getString("PREVIOUS_VALUE"));
@ -813,7 +832,7 @@ public class StandardActionDAO implements ActionDAO {
}
private PurgeDetails getPurgeDetails(Integer actionId) throws DataAccessException {
PurgeDetails purgeDetails = null;
FlowChangePurgeDetails purgeDetails = null;
PreparedStatement statement = null;
ResultSet rs = null;
try {
@ -826,7 +845,7 @@ public class StandardActionDAO implements ActionDAO {
// ensure results
if (rs.next()) {
purgeDetails = new PurgeDetails();
purgeDetails = new FlowChangePurgeDetails();
purgeDetails.setEndDate(new Date(rs.getTimestamp("END_DATE").getTime()));
}
} catch (SQLException sqle) {

View File

@ -462,7 +462,7 @@ public class StandardUserDAO implements UserDAO {
}
@Override
public void createUser(NiFiUser user) throws DataAccessException {
public NiFiUser createUser(NiFiUser user) throws DataAccessException {
if (user.getDn() == null) {
throw new IllegalArgumentException("User dn must be specified.");
}
@ -493,6 +493,8 @@ public class StandardUserDAO implements UserDAO {
} else {
throw new DataAccessException("Unable to insert user.");
}
return user;
} catch (SQLException sqle) {
throw new DataAccessException(sqle);
} catch (DataAccessException dae) {

View File

@ -16,14 +16,15 @@
*/
package org.apache.nifi.admin.service;
import org.apache.nifi.action.Action;
import org.apache.nifi.history.History;
import org.apache.nifi.history.HistoryQuery;
import org.apache.nifi.history.PreviousValue;
import java.util.Collection;
import java.util.Date;
import java.util.List;
import java.util.Map;
import org.apache.nifi.action.Action;
import org.apache.nifi.history.HistoryQuery;
import org.apache.nifi.history.History;
import org.apache.nifi.history.PreviousValue;
/**
* Allows NiFi actions to be audited.
@ -54,6 +55,15 @@ public interface AuditService {
*/
History getActions(HistoryQuery actionQuery);
/**
* Get the actions starting with firstActionId, returning up to maxActions.
*
* @param firstActionId the offset
* @param maxActions the number of actions to return
* @return history of actions matching the above conditions
*/
History getActions(final int firstActionId, final int maxActions);
/**
* Get the details for the specified action id. If the action cannot be
* found, null is returned.

View File

@ -16,12 +16,13 @@
*/
package org.apache.nifi.admin.service.action;
import java.util.Collection;
import org.apache.nifi.action.Action;
import org.apache.nifi.admin.dao.ActionDAO;
import org.apache.nifi.admin.dao.DAOFactory;
import org.apache.nifi.authorization.AuthorityProvider;
import java.util.Collection;
/**
* Adds the specified actions.
*/

View File

@ -16,12 +16,13 @@
*/
package org.apache.nifi.admin.service.action;
import java.util.Date;
import org.apache.nifi.action.Action;
import org.apache.nifi.admin.dao.ActionDAO;
import org.apache.nifi.admin.dao.DAOFactory;
import org.apache.nifi.authorization.AuthorityProvider;
import java.util.Date;
/**
* Purges actions up to a specified end date.
*/

View File

@ -16,12 +16,6 @@
*/
package org.apache.nifi.admin.service.impl;
import java.io.IOException;
import java.util.Collection;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.nifi.action.Action;
import org.apache.nifi.admin.dao.DataAccessException;
import org.apache.nifi.admin.service.AdministrationException;
@ -40,6 +34,13 @@ import org.apache.nifi.history.PreviousValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Collection;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
*
*/
@ -140,6 +141,17 @@ public class StandardAuditService implements AuditService {
return history;
}
@Override
public History getActions(int firstActionId, int maxActions) {
final HistoryQuery query = new HistoryQuery();
query.setOffset(firstActionId);
query.setCount(maxActions);
query.setSortOrder("asc");
query.setSortColumn("timestamp");
return getActions(query);
}
@Override
public Action getAction(Integer actionId) {
Transaction transaction = null;

View File

@ -23,7 +23,7 @@
http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.1.xsd">
<!-- user authority provider -->
<bean id="authorityProvider" class="org.apache.nifi.authorization.AuthorityProviderFactoryBean" depends-on="clusterManager flowController">
<bean id="authorityProvider" class="org.apache.nifi.authorization.AuthorityProviderFactoryBean" depends-on="clusterManager">
<property name="properties" ref="nifiProperties"/>
</bean>

View File

@ -16,12 +16,11 @@
*/
package org.apache.nifi.cluster.manager.impl;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.nifi.action.Action;
import org.apache.nifi.admin.service.AuditService;
import org.apache.nifi.controller.status.ProcessGroupStatus;
import org.apache.nifi.events.EventReporter;
import org.apache.nifi.history.History;
import org.apache.nifi.provenance.ProvenanceEventBuilder;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.ProvenanceEventRepository;
@ -31,12 +30,18 @@ import org.apache.nifi.provenance.search.QuerySubmission;
import org.apache.nifi.provenance.search.SearchableField;
import org.apache.nifi.reporting.EventAccess;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
public class ClusteredEventAccess implements EventAccess {
private final WebClusterManager clusterManager;
private final AuditService auditService;
public ClusteredEventAccess(final WebClusterManager clusterManager) {
public ClusteredEventAccess(final WebClusterManager clusterManager, final AuditService auditService) {
this.clusterManager = clusterManager;
this.auditService = auditService;
}
@Override
@ -132,4 +137,10 @@ public class ClusteredEventAccess implements EventAccess {
}
};
}
@Override
public List<Action> getFlowChanges(int firstActionId, int maxActions) {
final History history = auditService.getActions(firstActionId, maxActions);
return new ArrayList<>(history.getActions());
}
}

View File

@ -1075,7 +1075,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
final ValidationContextFactory validationContextFactory = new StandardValidationContextFactory(this);
final ReportingTaskNode taskNode = new ClusteredReportingTaskNode(task, id, processScheduler,
new ClusteredEventAccess(this), bulletinRepository, controllerServiceProvider, validationContextFactory);
new ClusteredEventAccess(this, auditService), bulletinRepository, controllerServiceProvider, validationContextFactory);
taskNode.setName(task.getClass().getSimpleName());
reportingTasks.put(id, taskNode);

View File

@ -16,39 +16,10 @@
*/
package org.apache.nifi.controller;
import static java.util.Objects.requireNonNull;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import javax.net.ssl.SSLContext;
import com.sun.jersey.api.client.ClientHandlerException;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.action.Action;
import org.apache.nifi.admin.service.AuditService;
import org.apache.nifi.admin.service.UserService;
import org.apache.nifi.annotation.lifecycle.OnAdded;
import org.apache.nifi.annotation.lifecycle.OnRemoved;
@ -139,6 +110,7 @@ import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.groups.RemoteProcessGroup;
import org.apache.nifi.groups.RemoteProcessGroupPortDescriptor;
import org.apache.nifi.groups.StandardProcessGroup;
import org.apache.nifi.history.History;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.logging.ControllerServiceLogObserver;
import org.apache.nifi.logging.LogLevel;
@ -202,7 +174,36 @@ import org.apache.nifi.web.api.dto.status.StatusHistoryDTO;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.sun.jersey.api.client.ClientHandlerException;
import javax.net.ssl.SSLContext;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import static java.util.Objects.requireNonNull;
public class FlowController implements EventAccess, ControllerServiceProvider, ReportingTaskProvider, Heartbeater, QueueProvider {
@ -243,6 +244,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
private final AtomicBoolean initialized = new AtomicBoolean(false);
private final ControllerServiceProvider controllerServiceProvider;
private final UserService userService;
private final AuditService auditService;
private final EventDrivenWorkerQueue eventDrivenWorkerQueue;
private final ComponentStatusRepository componentStatusRepository;
private final long systemStartTime = System.currentTimeMillis(); // time at which the node was started
@ -342,32 +344,36 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
private static final Logger heartbeatLogger = LoggerFactory.getLogger("org.apache.nifi.cluster.heartbeat");
public static FlowController createStandaloneInstance(
final FlowFileEventRepository flowFileEventRepo,
final NiFiProperties properties,
final UserService userService,
final StringEncryptor encryptor) {
final FlowFileEventRepository flowFileEventRepo,
final NiFiProperties properties,
final UserService userService,
final AuditService auditService,
final StringEncryptor encryptor) {
return new FlowController(
flowFileEventRepo,
properties,
userService,
encryptor,
/* configuredForClustering */ false,
/* NodeProtocolSender */ null);
flowFileEventRepo,
properties,
userService,
auditService,
encryptor,
/* configuredForClustering */ false,
/* NodeProtocolSender */ null);
}
public static FlowController createClusteredInstance(
final FlowFileEventRepository flowFileEventRepo,
final NiFiProperties properties,
final UserService userService,
final StringEncryptor encryptor,
final NodeProtocolSender protocolSender) {
final FlowFileEventRepository flowFileEventRepo,
final NiFiProperties properties,
final UserService userService,
final AuditService auditService,
final StringEncryptor encryptor,
final NodeProtocolSender protocolSender) {
final FlowController flowController = new FlowController(
flowFileEventRepo,
properties,
userService,
encryptor,
/* configuredForClustering */ true,
/* NodeProtocolSender */ protocolSender);
flowFileEventRepo,
properties,
userService,
auditService,
encryptor,
/* configuredForClustering */ true,
/* NodeProtocolSender */ protocolSender);
flowController.setClusterManagerRemoteSiteInfo(properties.getRemoteInputPort(), properties.isSiteToSiteSecure());
@ -375,12 +381,13 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
}
private FlowController(
final FlowFileEventRepository flowFileEventRepo,
final NiFiProperties properties,
final UserService userService,
final StringEncryptor encryptor,
final boolean configuredForClustering,
final NodeProtocolSender protocolSender) {
final FlowFileEventRepository flowFileEventRepo,
final NiFiProperties properties,
final UserService userService,
final AuditService auditService,
final StringEncryptor encryptor,
final boolean configuredForClustering,
final NodeProtocolSender protocolSender) {
maxTimerDrivenThreads = new AtomicInteger(10);
maxEventDrivenThreads = new AtomicInteger(5);
@ -428,6 +435,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
startConnectablesAfterInitialization = new ArrayList<>();
startRemoteGroupPortsAfterInitialization = new ArrayList<>();
this.userService = userService;
this.auditService = auditService;
final String gracefulShutdownSecondsVal = properties.getProperty(GRACEFUL_SHUTDOWN_PERIOD);
long shutdownSecs;
@ -3638,7 +3646,13 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
@Override
public List<ProvenanceEventRecord> getProvenanceEvents(final long firstEventId, final int maxRecords) throws IOException {
return new ArrayList<ProvenanceEventRecord>(provenanceEventRepository.getEvents(firstEventId, maxRecords));
return new ArrayList<>(provenanceEventRepository.getEvents(firstEventId, maxRecords));
}
@Override
public List<Action> getFlowChanges(final int firstActionId, final int maxActions) {
final History history = auditService.getActions(firstActionId, maxActions);
return new ArrayList<>(history.getActions());
}
public void setClusterManagerRemoteSiteInfo(final Integer managerListeningPort, final Boolean commsSecure) {

View File

@ -16,6 +16,7 @@
*/
package org.apache.nifi.spring;
import org.apache.nifi.admin.service.AuditService;
import org.apache.nifi.admin.service.UserService;
import org.apache.nifi.cluster.protocol.NodeProtocolSender;
import org.apache.nifi.controller.FlowController;
@ -38,6 +39,7 @@ public class FlowControllerFactoryBean implements FactoryBean, ApplicationContex
private FlowController flowController;
private NiFiProperties properties;
private UserService userService;
private AuditService auditService;
private StringEncryptor encryptor;
@Override
@ -57,6 +59,7 @@ public class FlowControllerFactoryBean implements FactoryBean, ApplicationContex
flowFileEventRepository,
properties,
userService,
auditService,
encryptor,
nodeProtocolSender);
} else {
@ -64,6 +67,7 @@ public class FlowControllerFactoryBean implements FactoryBean, ApplicationContex
flowFileEventRepository,
properties,
userService,
auditService,
encryptor);
}
@ -98,4 +102,8 @@ public class FlowControllerFactoryBean implements FactoryBean, ApplicationContex
public void setEncryptor(final StringEncryptor encryptor) {
this.encryptor = encryptor;
}
public void setAuditService(final AuditService auditService) {
this.auditService = auditService;
}
}

View File

@ -37,6 +37,7 @@
<bean id="flowController" class="org.apache.nifi.spring.FlowControllerFactoryBean">
<property name="properties" ref="nifiProperties"/>
<property name="userService" ref="userService" />
<property name="auditService" ref="auditService" />
<property name="encryptor" ref="stringEncryptor" />
</bean>

View File

@ -24,6 +24,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.nifi.admin.service.AuditService;
import org.apache.nifi.admin.service.UserService;
import org.apache.nifi.cluster.protocol.StandardDataFlow;
import org.apache.nifi.controller.repository.FlowFileEventRepository;
@ -55,6 +56,7 @@ public class StandardFlowServiceTest {
private NiFiProperties properties;
private FlowFileEventRepository mockFlowFileEventRepository;
private UserService mockUserService;
private AuditService mockAuditService;
private StringEncryptor mockEncryptor;
@BeforeClass
@ -67,7 +69,8 @@ public class StandardFlowServiceTest {
properties = NiFiProperties.getInstance();
mockFlowFileEventRepository = mock(FlowFileEventRepository.class);
mockUserService = mock(UserService.class);
flowController = FlowController.createStandaloneInstance(mockFlowFileEventRepository, properties, mockUserService, mockEncryptor);
mockAuditService = mock(AuditService.class);
flowController = FlowController.createStandaloneInstance(mockFlowFileEventRepository, properties, mockUserService, mockAuditService, mockEncryptor);
flowService = StandardFlowService.createStandaloneInstance(flowController, properties, mockEncryptor);
}

View File

@ -21,4 +21,11 @@
<version>0.3.1-SNAPSHOT</version>
</parent>
<artifactId>nifi-user-actions</artifactId>
<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-api</artifactId>
</dependency>
</dependencies>
</project>

View File

@ -16,7 +16,6 @@
*/
package org.apache.nifi.action;
import java.io.Serializable;
import org.apache.nifi.action.component.details.ComponentDetails;
import org.apache.nifi.action.details.ActionDetails;
import java.util.Date;
@ -24,10 +23,10 @@ import java.util.Date;
/**
*
*/
public class Action implements Serializable {
public class FlowChangeAction implements Action {
private Integer id;
private String userDn;
private String userIdentity;
private String userName;
private Date timestamp;
@ -39,6 +38,7 @@ public class Action implements Serializable {
private Operation operation;
private ActionDetails actionDetails;
@Override
public Integer getId() {
return id;
}
@ -47,6 +47,7 @@ public class Action implements Serializable {
this.id = id;
}
@Override
public Date getTimestamp() {
return timestamp;
}
@ -55,14 +56,16 @@ public class Action implements Serializable {
this.timestamp = timestamp;
}
public String getUserDn() {
return userDn;
@Override
public String getUserIdentity() {
return userIdentity;
}
public void setUserDn(String userDn) {
this.userDn = userDn;
public void setUserIdentity(String userIdentity) {
this.userIdentity = userIdentity;
}
@Override
public String getUserName() {
return userName;
}
@ -71,6 +74,7 @@ public class Action implements Serializable {
this.userName = userName;
}
@Override
public String getSourceId() {
return sourceId;
}
@ -79,6 +83,7 @@ public class Action implements Serializable {
this.sourceId = sourceId;
}
@Override
public String getSourceName() {
return sourceName;
}
@ -87,6 +92,7 @@ public class Action implements Serializable {
this.sourceName = sourceName;
}
@Override
public Component getSourceType() {
return sourceType;
}
@ -95,6 +101,7 @@ public class Action implements Serializable {
this.sourceType = sourceType;
}
@Override
public ComponentDetails getComponentDetails() {
return componentDetails;
}
@ -103,6 +110,7 @@ public class Action implements Serializable {
this.componentDetails = componentDetails;
}
@Override
public Operation getOperation() {
return operation;
}
@ -111,6 +119,7 @@ public class Action implements Serializable {
this.operation = operation;
}
@Override
public ActionDetails getActionDetails() {
return actionDetails;
}

View File

@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.action.component.details;
/**
*
*/
public class FlowChangeExtensionDetails implements ExtensionDetails {
private String type;
@Override
public String getType() {
return type;
}
public void setType(String type) {
this.type = type;
}
}

View File

@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.action.component.details;
/**
*
*/
public class FlowChangeRemoteProcessGroupDetails implements RemoteProcessGroupDetails {
private String uri;
@Override
public String getUri() {
return uri;
}
public void setUri(String uri) {
this.uri = uri;
}
}

View File

@ -19,12 +19,13 @@ package org.apache.nifi.action.details;
/**
*
*/
public class ConfigureDetails extends ActionDetails {
public class FlowChangeConfigureDetails implements ConfigureDetails {
private String name;
private String previousValue;
private String value;
@Override
public String getName() {
return name;
}
@ -33,6 +34,7 @@ public class ConfigureDetails extends ActionDetails {
this.name = name;
}
@Override
public String getPreviousValue() {
return previousValue;
}
@ -41,6 +43,7 @@ public class ConfigureDetails extends ActionDetails {
this.previousValue = previousValue;
}
@Override
public String getValue() {
return value;
}

View File

@ -21,7 +21,7 @@ import org.apache.nifi.action.Component;
/**
*
*/
public class ConnectDetails extends ActionDetails {
public class FlowChangeConnectDetails implements ConnectDetails {
private String sourceId;
private String sourceName;
@ -31,6 +31,7 @@ public class ConnectDetails extends ActionDetails {
private String destinationName;
private Component destinationType;
@Override
public String getSourceId() {
return sourceId;
}
@ -39,6 +40,7 @@ public class ConnectDetails extends ActionDetails {
this.sourceId = sourceId;
}
@Override
public String getSourceName() {
return sourceName;
}
@ -47,6 +49,7 @@ public class ConnectDetails extends ActionDetails {
this.sourceName = sourceName;
}
@Override
public Component getSourceType() {
return sourceType;
}
@ -55,6 +58,7 @@ public class ConnectDetails extends ActionDetails {
this.sourceType = sourceType;
}
@Override
public String getDestinationId() {
return destinationId;
}
@ -63,6 +67,7 @@ public class ConnectDetails extends ActionDetails {
this.destinationId = destinationId;
}
@Override
public String getDestinationName() {
return destinationName;
}
@ -71,6 +76,7 @@ public class ConnectDetails extends ActionDetails {
this.destinationName = destinationName;
}
@Override
public Component getDestinationType() {
return destinationType;
}
@ -79,6 +85,7 @@ public class ConnectDetails extends ActionDetails {
this.destinationType = destinationType;
}
@Override
public String getRelationship() {
return relationship;
}

View File

@ -19,13 +19,14 @@ package org.apache.nifi.action.details;
/**
*
*/
public class MoveDetails extends ActionDetails {
public class FlowChangeMoveDetails implements MoveDetails {
private String previousGroupId;
private String previousGroup;
private String groupId;
private String group;
@Override
public String getGroup() {
return group;
}
@ -34,6 +35,7 @@ public class MoveDetails extends ActionDetails {
this.group = group;
}
@Override
public String getGroupId() {
return groupId;
}
@ -42,6 +44,7 @@ public class MoveDetails extends ActionDetails {
this.groupId = groupId;
}
@Override
public String getPreviousGroup() {
return previousGroup;
}
@ -50,6 +53,7 @@ public class MoveDetails extends ActionDetails {
this.previousGroup = previousGroup;
}
@Override
public String getPreviousGroupId() {
return previousGroupId;
}

View File

@ -21,7 +21,7 @@ import java.util.Date;
/**
*
*/
public class PurgeDetails extends ActionDetails {
public class FlowChangePurgeDetails implements PurgeDetails {
private Date endDate;
@ -30,6 +30,7 @@ public class PurgeDetails extends ActionDetails {
*
* @return date at which the purge ends
*/
@Override
public Date getEndDate() {
return endDate;
}

View File

@ -21,8 +21,9 @@ import java.util.Collection;
import java.util.Date;
import org.apache.nifi.action.Action;
import org.apache.nifi.action.Component;
import org.apache.nifi.action.FlowChangeAction;
import org.apache.nifi.action.Operation;
import org.apache.nifi.action.details.ConfigureDetails;
import org.apache.nifi.action.details.FlowChangeConfigureDetails;
import org.apache.nifi.web.security.user.NiFiUserUtils;
import org.apache.nifi.user.NiFiUser;
import org.apache.nifi.web.controller.ControllerFacade;
@ -70,14 +71,14 @@ public class ControllerAuditor extends NiFiAuditor {
Collection<Action> actions = new ArrayList<>();
// create the configuration details
ConfigureDetails configDetails = new ConfigureDetails();
FlowChangeConfigureDetails configDetails = new FlowChangeConfigureDetails();
configDetails.setName("Controller Name");
configDetails.setValue(name);
configDetails.setPreviousValue(previousName);
// create the config action
Action configAction = new Action();
configAction.setUserDn(user.getDn());
FlowChangeAction configAction = new FlowChangeAction();
configAction.setUserIdentity(user.getDn());
configAction.setUserName(user.getUserName());
configAction.setOperation(Operation.Configure);
configAction.setTimestamp(new Date());
@ -123,14 +124,14 @@ public class ControllerAuditor extends NiFiAuditor {
Collection<Action> actions = new ArrayList<>();
// create the configuration details
ConfigureDetails configDetails = new ConfigureDetails();
FlowChangeConfigureDetails configDetails = new FlowChangeConfigureDetails();
configDetails.setName("Controller Comments");
configDetails.setValue(comments);
configDetails.setPreviousValue(previousComments);
// create the config action
Action configAction = new Action();
configAction.setUserDn(user.getDn());
FlowChangeAction configAction = new FlowChangeAction();
configAction.setUserIdentity(user.getDn());
configAction.setUserName(user.getUserName());
configAction.setOperation(Operation.Configure);
configAction.setTimestamp(new Date());
@ -176,14 +177,14 @@ public class ControllerAuditor extends NiFiAuditor {
Collection<Action> actions = new ArrayList<>();
// create the configure details
ConfigureDetails configDetails = new ConfigureDetails();
FlowChangeConfigureDetails configDetails = new FlowChangeConfigureDetails();
configDetails.setName("Controller Max Timer Driven Thread Count");
configDetails.setValue(String.valueOf(maxTimerDrivenThreadCount));
configDetails.setPreviousValue(String.valueOf(previousMaxTimerDrivenThreadCount));
// create the config action
Action configAction = new Action();
configAction.setUserDn(user.getDn());
FlowChangeAction configAction = new FlowChangeAction();
configAction.setUserIdentity(user.getDn());
configAction.setUserName(user.getUserName());
configAction.setOperation(Operation.Configure);
configAction.setTimestamp(new Date());
@ -229,14 +230,14 @@ public class ControllerAuditor extends NiFiAuditor {
Collection<Action> actions = new ArrayList<>();
// create the configure details
ConfigureDetails configDetails = new ConfigureDetails();
FlowChangeConfigureDetails configDetails = new FlowChangeConfigureDetails();
configDetails.setName("Controller Max Event Driven Thread Count");
configDetails.setValue(String.valueOf(maxEventDrivenThreadCount));
configDetails.setPreviousValue(String.valueOf(previousMaxEventDrivenThreadCount));
// create the config action
Action configAction = new Action();
configAction.setUserDn(user.getDn());
FlowChangeAction configAction = new FlowChangeAction();
configAction.setUserIdentity(user.getDn());
configAction.setUserName(user.getUserName());
configAction.setOperation(Operation.Configure);
configAction.setTimestamp(new Date());

View File

@ -25,10 +25,11 @@ import java.util.Set;
import org.apache.nifi.action.Action;
import org.apache.nifi.action.Component;
import org.apache.nifi.action.FlowChangeAction;
import org.apache.nifi.action.Operation;
import org.apache.nifi.action.component.details.ExtensionDetails;
import org.apache.nifi.action.component.details.FlowChangeExtensionDetails;
import org.apache.nifi.action.details.ActionDetails;
import org.apache.nifi.action.details.ConfigureDetails;
import org.apache.nifi.action.details.FlowChangeConfigureDetails;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.controller.ConfiguredComponent;
import org.apache.nifi.controller.ProcessorNode;
@ -121,7 +122,7 @@ public class ControllerServiceAuditor extends NiFiAuditor {
Map<String, String> updatedValues = extractConfiguredPropertyValues(controllerService, controllerServiceDTO);
// create the controller service details
ExtensionDetails serviceDetails = new ExtensionDetails();
FlowChangeExtensionDetails serviceDetails = new FlowChangeExtensionDetails();
serviceDetails.setType(controllerService.getControllerServiceImplementation().getClass().getSimpleName());
// create a controller service action
@ -159,14 +160,14 @@ public class ControllerServiceAuditor extends NiFiAuditor {
}
}
final ConfigureDetails actionDetails = new ConfigureDetails();
final FlowChangeConfigureDetails actionDetails = new FlowChangeConfigureDetails();
actionDetails.setName(property);
actionDetails.setValue(newValue);
actionDetails.setPreviousValue(oldValue);
// create a configuration action
Action configurationAction = new Action();
configurationAction.setUserDn(user.getDn());
FlowChangeAction configurationAction = new FlowChangeAction();
configurationAction.setUserIdentity(user.getDn());
configurationAction.setUserName(user.getUserName());
configurationAction.setOperation(operation);
configurationAction.setTimestamp(actionTimestamp);
@ -185,8 +186,8 @@ public class ControllerServiceAuditor extends NiFiAuditor {
// determine if the running state has changed and its not disabled
if (isDisabled != updateIsDisabled) {
// create a controller service action
Action serviceAction = new Action();
serviceAction.setUserDn(user.getDn());
FlowChangeAction serviceAction = new FlowChangeAction();
serviceAction.setUserIdentity(user.getDn());
serviceAction.setUserName(user.getUserName());
serviceAction.setTimestamp(new Date());
serviceAction.setSourceId(controllerService.getIdentifier());
@ -265,12 +266,12 @@ public class ControllerServiceAuditor extends NiFiAuditor {
final ProcessorNode processor = ((ProcessorNode) component);
// create the processor details
ExtensionDetails processorDetails = new ExtensionDetails();
FlowChangeExtensionDetails processorDetails = new FlowChangeExtensionDetails();
processorDetails.setType(processor.getProcessor().getClass().getSimpleName());
// create a processor action
Action processorAction = new Action();
processorAction.setUserDn(user.getDn());
FlowChangeAction processorAction = new FlowChangeAction();
processorAction.setUserIdentity(user.getDn());
processorAction.setUserName(user.getUserName());
processorAction.setTimestamp(new Date());
processorAction.setSourceId(processor.getIdentifier());
@ -283,12 +284,12 @@ public class ControllerServiceAuditor extends NiFiAuditor {
final ReportingTaskNode reportingTask = ((ReportingTaskNode) component);
// create the reporting task details
ExtensionDetails processorDetails = new ExtensionDetails();
FlowChangeExtensionDetails processorDetails = new FlowChangeExtensionDetails();
processorDetails.setType(reportingTask.getReportingTask().getClass().getSimpleName());
// create a reporting task action
Action reportingTaskAction = new Action();
reportingTaskAction.setUserDn(user.getDn());
FlowChangeAction reportingTaskAction = new FlowChangeAction();
reportingTaskAction.setUserIdentity(user.getDn());
reportingTaskAction.setUserName(user.getUserName());
reportingTaskAction.setTimestamp(new Date());
reportingTaskAction.setSourceId(reportingTask.getIdentifier());
@ -301,12 +302,12 @@ public class ControllerServiceAuditor extends NiFiAuditor {
final ControllerServiceNode controllerService = ((ControllerServiceNode) component);
// create the controller service details
ExtensionDetails serviceDetails = new ExtensionDetails();
FlowChangeExtensionDetails serviceDetails = new FlowChangeExtensionDetails();
serviceDetails.setType(controllerService.getControllerServiceImplementation().getClass().getSimpleName());
// create a controller service action
Action serviceAction = new Action();
serviceAction.setUserDn(user.getDn());
FlowChangeAction serviceAction = new FlowChangeAction();
serviceAction.setUserIdentity(user.getDn());
serviceAction.setUserName(user.getUserName());
serviceAction.setTimestamp(new Date());
serviceAction.setSourceId(controllerService.getIdentifier());
@ -373,7 +374,7 @@ public class ControllerServiceAuditor extends NiFiAuditor {
* @return action
*/
private Action generateAuditRecord(ControllerServiceNode controllerService, Operation operation, ActionDetails actionDetails) {
Action action = null;
FlowChangeAction action = null;
// get the current user
NiFiUser user = NiFiUserUtils.getNiFiUser();
@ -381,12 +382,12 @@ public class ControllerServiceAuditor extends NiFiAuditor {
// ensure the user was found
if (user != null) {
// create the controller service details
ExtensionDetails serviceDetails = new ExtensionDetails();
FlowChangeExtensionDetails serviceDetails = new FlowChangeExtensionDetails();
serviceDetails.setType(controllerService.getControllerServiceImplementation().getClass().getSimpleName());
// create the controller service action for adding this controller service
action = new Action();
action.setUserDn(user.getDn());
action = new FlowChangeAction();
action.setUserIdentity(user.getDn());
action.setUserName(user.getUserName());
action.setOperation(operation);
action.setTimestamp(new Date());

View File

@ -20,6 +20,7 @@ import java.util.Date;
import org.apache.nifi.action.Action;
import org.apache.nifi.action.Component;
import org.apache.nifi.action.FlowChangeAction;
import org.apache.nifi.action.Operation;
import org.apache.nifi.action.details.ActionDetails;
import org.apache.nifi.connectable.Funnel;
@ -111,7 +112,7 @@ public class FunnelAuditor extends NiFiAuditor {
* @return action
*/
public Action generateAuditRecord(Funnel funnel, Operation operation, ActionDetails actionDetails) {
Action action = null;
FlowChangeAction action = null;
// get the current user
NiFiUser user = NiFiUserUtils.getNiFiUser();
@ -119,8 +120,8 @@ public class FunnelAuditor extends NiFiAuditor {
// ensure the user was found
if (user != null) {
// create the action for adding this funnel
action = new Action();
action.setUserDn(user.getDn());
action = new FlowChangeAction();
action.setUserIdentity(user.getDn());
action.setUserName(user.getUserName());
action.setOperation(operation);
action.setTimestamp(new Date());

View File

@ -19,6 +19,7 @@ package org.apache.nifi.audit;
import java.util.ArrayList;
import java.util.Collection;
import org.apache.nifi.action.Action;
import org.apache.nifi.action.details.FlowChangeMoveDetails;
import org.apache.nifi.action.details.MoveDetails;
import org.apache.nifi.admin.service.AuditService;
import org.apache.nifi.cluster.context.ClusterContext;
@ -62,22 +63,22 @@ public abstract class NiFiAuditor {
// if we're a connected node, then put audit actions on threadlocal to propagate back to manager
if (ctx != null) {
ctx.getActions().addAll(actions);
} else {
// if we're the cluster manager, or a disconnected node, or running standalone, then audit actions
try {
// record the operations
auditService.addActions(actions);
} catch (Throwable t) {
logger.warn("Unable to record actions: " + t.getMessage());
if (logger.isDebugEnabled()) {
logger.warn(StringUtils.EMPTY, t);
}
}
// always save the actions regardless of cluster or stand-alone
// all nodes in a cluster will have their own local copy without batching
try {
auditService.addActions(actions);
} catch (Throwable t) {
logger.warn("Unable to record actions: " + t.getMessage());
if (logger.isDebugEnabled()) {
logger.warn(StringUtils.EMPTY, t);
}
}
}
protected MoveDetails createMoveDetails(String previousGroupId, String newGroupId, Logger logger) {
MoveDetails moveDetails = null;
FlowChangeMoveDetails moveDetails = null;
// get the groups in question
ProcessGroup previousGroup = processGroupDAO.getProcessGroup(previousGroupId);
@ -86,7 +87,7 @@ public abstract class NiFiAuditor {
// ensure the groups were found
if (previousGroup != null && newGroup != null) {
// create the move details
moveDetails = new MoveDetails();
moveDetails = new FlowChangeMoveDetails();
moveDetails.setPreviousGroupId(previousGroup.getIdentifier());
moveDetails.setPreviousGroup(previousGroup.getName());
moveDetails.setGroupId(newGroup.getIdentifier());

View File

@ -24,9 +24,10 @@ import java.util.Set;
import org.apache.nifi.action.Action;
import org.apache.nifi.action.Component;
import org.apache.nifi.action.FlowChangeAction;
import org.apache.nifi.action.Operation;
import org.apache.nifi.action.details.ActionDetails;
import org.apache.nifi.action.details.ConfigureDetails;
import org.apache.nifi.action.details.FlowChangeConfigureDetails;
import org.apache.nifi.connectable.ConnectableType;
import org.apache.nifi.connectable.Port;
import org.apache.nifi.controller.ScheduledState;
@ -115,7 +116,7 @@ public class PortAuditor extends NiFiAuditor {
// see if the name has changed
if (name != null && portDTO.getName() != null && !name.equals(updatedPort.getName())) {
// create the config details
ConfigureDetails configDetails = new ConfigureDetails();
FlowChangeConfigureDetails configDetails = new FlowChangeConfigureDetails();
configDetails.setName("Name");
configDetails.setValue(updatedPort.getName());
configDetails.setPreviousValue(name);
@ -126,7 +127,7 @@ public class PortAuditor extends NiFiAuditor {
// see if the comments has changed
if (comments != null && portDTO.getComments() != null && !comments.equals(updatedPort.getComments())) {
// create the config details
ConfigureDetails configDetails = new ConfigureDetails();
FlowChangeConfigureDetails configDetails = new FlowChangeConfigureDetails();
configDetails.setName("Comments");
configDetails.setValue(updatedPort.getComments());
configDetails.setPreviousValue(comments);
@ -138,7 +139,7 @@ public class PortAuditor extends NiFiAuditor {
if (isRootGroupPort) {
if (portDTO.getConcurrentlySchedulableTaskCount() != null && updatedPort.getMaxConcurrentTasks() != maxConcurrentTasks) {
// create the config details
ConfigureDetails configDetails = new ConfigureDetails();
FlowChangeConfigureDetails configDetails = new FlowChangeConfigureDetails();
configDetails.setName("Concurrent Tasks");
configDetails.setValue(String.valueOf(updatedPort.getMaxConcurrentTasks()));
configDetails.setPreviousValue(String.valueOf(maxConcurrentTasks));
@ -157,7 +158,7 @@ public class PortAuditor extends NiFiAuditor {
// if users were added/removed
if (newUsers.size() > 0 || removedUsers.size() > 0) {
// create the config details
ConfigureDetails configDetails = new ConfigureDetails();
FlowChangeConfigureDetails configDetails = new FlowChangeConfigureDetails();
configDetails.setName("User Access Control");
configDetails.setValue(StringUtils.join(portDTO.getUserAccessControl(), ", "));
configDetails.setPreviousValue(StringUtils.join(existingUsers, ", "));
@ -177,7 +178,7 @@ public class PortAuditor extends NiFiAuditor {
// if groups were added/removed
if (newGroups.size() > 0 || removedGroups.size() > 0) {
// create the config details
ConfigureDetails configDetails = new ConfigureDetails();
FlowChangeConfigureDetails configDetails = new FlowChangeConfigureDetails();
configDetails.setName("Group Access Control");
configDetails.setValue(StringUtils.join(portDTO.getGroupAccessControl(), ", "));
configDetails.setPreviousValue(StringUtils.join(existingGroups, ", "));
@ -203,8 +204,8 @@ public class PortAuditor extends NiFiAuditor {
// create the actions
for (ActionDetails detail : configurationDetails) {
// create the port action for updating the name
Action portAction = new Action();
portAction.setUserDn(user.getDn());
FlowChangeAction portAction = new FlowChangeAction();
portAction.setUserIdentity(user.getDn());
portAction.setUserName(user.getUserName());
portAction.setOperation(Operation.Configure);
portAction.setTimestamp(timestamp);
@ -223,8 +224,8 @@ public class PortAuditor extends NiFiAuditor {
// determine if the running state has changed
if (scheduledState != updatedScheduledState) {
// create a processor action
Action processorAction = new Action();
processorAction.setUserDn(user.getDn());
FlowChangeAction processorAction = new FlowChangeAction();
processorAction.setUserIdentity(user.getDn());
processorAction.setUserName(user.getUserName());
processorAction.setTimestamp(new Date());
processorAction.setSourceId(updatedPort.getIdentifier());
@ -307,7 +308,7 @@ public class PortAuditor extends NiFiAuditor {
* @return action
*/
public Action generateAuditRecord(Port port, Operation operation, ActionDetails actionDetails) {
Action action = null;
FlowChangeAction action = null;
// get the current user
NiFiUser user = NiFiUserUtils.getNiFiUser();
@ -321,8 +322,8 @@ public class PortAuditor extends NiFiAuditor {
}
// create the port action for adding this processor
action = new Action();
action.setUserDn(user.getDn());
action = new FlowChangeAction();
action.setUserIdentity(user.getDn());
action.setUserName(user.getUserName());
action.setOperation(operation);
action.setTimestamp(new Date());

View File

@ -21,10 +21,11 @@ import java.util.Collection;
import java.util.Date;
import org.apache.nifi.action.Action;
import org.apache.nifi.action.Component;
import org.apache.nifi.action.FlowChangeAction;
import org.apache.nifi.action.Operation;
import org.apache.nifi.action.details.ActionDetails;
import org.apache.nifi.action.details.ConfigureDetails;
import org.apache.nifi.action.details.MoveDetails;
import org.apache.nifi.action.details.FlowChangeConfigureDetails;
import org.apache.nifi.action.details.FlowChangeMoveDetails;
import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.web.security.user.NiFiUserUtils;
import org.apache.nifi.user.NiFiUser;
@ -103,7 +104,7 @@ public class ProcessGroupAuditor extends NiFiAuditor {
// see if the name has changed
if (name != null && updatedProcessGroup.getName() != null && !name.equals(updatedProcessGroup.getName())) {
// create the config details
ConfigureDetails configDetails = new ConfigureDetails();
FlowChangeConfigureDetails configDetails = new FlowChangeConfigureDetails();
configDetails.setName("name");
configDetails.setValue(updatedProcessGroup.getName());
configDetails.setPreviousValue(name);
@ -114,7 +115,7 @@ public class ProcessGroupAuditor extends NiFiAuditor {
// see if the comments has changed
if (comments != null && updatedProcessGroup.getComments() != null && !comments.equals(updatedProcessGroup.getComments())) {
// create the config details
ConfigureDetails configDetails = new ConfigureDetails();
FlowChangeConfigureDetails configDetails = new FlowChangeConfigureDetails();
configDetails.setName("comments");
configDetails.setValue(updatedProcessGroup.getComments());
configDetails.setPreviousValue(comments);
@ -133,13 +134,13 @@ public class ProcessGroupAuditor extends NiFiAuditor {
for (ActionDetails detail : details) {
// determine the type of operation being performed
Operation operation = Operation.Configure;
if (detail instanceof MoveDetails) {
if (detail instanceof FlowChangeMoveDetails) {
operation = Operation.Move;
}
// create the port action for updating the name
Action processGroupAction = new Action();
processGroupAction.setUserDn(user.getDn());
FlowChangeAction processGroupAction = new FlowChangeAction();
processGroupAction.setUserIdentity(user.getDn());
processGroupAction.setUserName(user.getUserName());
processGroupAction.setOperation(operation);
processGroupAction.setTimestamp(timestamp);
@ -155,8 +156,8 @@ public class ProcessGroupAuditor extends NiFiAuditor {
// if the user was starting/stopping this process group
if (processGroupDTO.isRunning() != null) {
// create a process group action
Action processGroupAction = new Action();
processGroupAction.setUserDn(user.getDn());
FlowChangeAction processGroupAction = new FlowChangeAction();
processGroupAction.setUserIdentity(user.getDn());
processGroupAction.setUserName(user.getUserName());
processGroupAction.setSourceId(processGroup.getIdentifier());
processGroupAction.setSourceName(processGroup.getName());
@ -231,7 +232,7 @@ public class ProcessGroupAuditor extends NiFiAuditor {
* @return action
*/
public Action generateAuditRecord(ProcessGroup processGroup, Operation operation, ActionDetails actionDetails) {
Action action = null;
FlowChangeAction action = null;
// get the current user
NiFiUser user = NiFiUserUtils.getNiFiUser();
@ -240,8 +241,8 @@ public class ProcessGroupAuditor extends NiFiAuditor {
if (user != null) {
// create the process group action for adding this process group
action = new Action();
action.setUserDn(user.getDn());
action = new FlowChangeAction();
action.setUserIdentity(user.getDn());
action.setUserName(user.getUserName());
action.setOperation(operation);
action.setTimestamp(new Date());

View File

@ -29,10 +29,11 @@ import java.util.Set;
import org.apache.nifi.action.Action;
import org.apache.nifi.action.Component;
import org.apache.nifi.action.FlowChangeAction;
import org.apache.nifi.action.Operation;
import org.apache.nifi.action.component.details.ExtensionDetails;
import org.apache.nifi.action.component.details.FlowChangeExtensionDetails;
import org.apache.nifi.action.details.ActionDetails;
import org.apache.nifi.action.details.ConfigureDetails;
import org.apache.nifi.action.details.FlowChangeConfigureDetails;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.controller.ProcessorNode;
import org.apache.nifi.controller.ScheduledState;
@ -131,7 +132,7 @@ public class ProcessorAuditor extends NiFiAuditor {
Map<String, String> updatedValues = extractConfiguredPropertyValues(processor, processorDTO);
// create the processor details
ExtensionDetails processorDetails = new ExtensionDetails();
FlowChangeExtensionDetails processorDetails = new FlowChangeExtensionDetails();
processorDetails.setType(processor.getProcessor().getClass().getSimpleName());
// create a processor action
@ -169,14 +170,14 @@ public class ProcessorAuditor extends NiFiAuditor {
}
}
final ConfigureDetails actionDetails = new ConfigureDetails();
final FlowChangeConfigureDetails actionDetails = new FlowChangeConfigureDetails();
actionDetails.setName(property);
actionDetails.setValue(newValue);
actionDetails.setPreviousValue(oldValue);
// create a configuration action
Action configurationAction = new Action();
configurationAction.setUserDn(user.getDn());
FlowChangeAction configurationAction = new FlowChangeAction();
configurationAction.setUserIdentity(user.getDn());
configurationAction.setUserName(user.getUserName());
configurationAction.setOperation(operation);
configurationAction.setTimestamp(actionTimestamp);
@ -195,8 +196,8 @@ public class ProcessorAuditor extends NiFiAuditor {
// determine if the running state has changed and its not disabled
if (scheduledState != updatedScheduledState) {
// create a processor action
Action processorAction = new Action();
processorAction.setUserDn(user.getDn());
FlowChangeAction processorAction = new FlowChangeAction();
processorAction.setUserIdentity(user.getDn());
processorAction.setUserName(user.getUserName());
processorAction.setTimestamp(new Date());
processorAction.setSourceId(processor.getIdentifier());
@ -280,7 +281,7 @@ public class ProcessorAuditor extends NiFiAuditor {
* @return action
*/
public Action generateAuditRecord(ProcessorNode processor, Operation operation, ActionDetails actionDetails) {
Action action = null;
FlowChangeAction action = null;
// get the current user
NiFiUser user = NiFiUserUtils.getNiFiUser();
@ -288,12 +289,12 @@ public class ProcessorAuditor extends NiFiAuditor {
// ensure the user was found
if (user != null) {
// create the processor details
ExtensionDetails processorDetails = new ExtensionDetails();
FlowChangeExtensionDetails processorDetails = new FlowChangeExtensionDetails();
processorDetails.setType(processor.getProcessor().getClass().getSimpleName());
// create the processor action for adding this processor
action = new Action();
action.setUserDn(user.getDn());
action = new FlowChangeAction();
action.setUserIdentity(user.getDn());
action.setUserName(user.getUserName());
action.setOperation(operation);
action.setTimestamp(new Date());

View File

@ -26,10 +26,12 @@ import java.util.Map;
import org.apache.nifi.action.Action;
import org.apache.nifi.action.Component;
import org.apache.nifi.action.FlowChangeAction;
import org.apache.nifi.action.Operation;
import org.apache.nifi.action.details.ActionDetails;
import org.apache.nifi.action.details.ConfigureDetails;
import org.apache.nifi.action.details.ConnectDetails;
import org.apache.nifi.action.details.FlowChangeConfigureDetails;
import org.apache.nifi.action.details.FlowChangeConnectDetails;
import org.apache.nifi.connectable.Connectable;
import org.apache.nifi.connectable.Connection;
import org.apache.nifi.connectable.Funnel;
@ -179,14 +181,14 @@ public class RelationshipAuditor extends NiFiAuditor {
// ensure the value is changing
if (oldValue == null || newValue == null || !newValue.equals(oldValue)) {
// create the config details
ConfigureDetails configurationDetails = new ConfigureDetails();
FlowChangeConfigureDetails configurationDetails = new FlowChangeConfigureDetails();
configurationDetails.setName(property);
configurationDetails.setValue(newValue);
configurationDetails.setPreviousValue(oldValue);
// create a configuration action
Action configurationAction = new Action();
configurationAction.setUserDn(user.getDn());
FlowChangeAction configurationAction = new FlowChangeAction();
configurationAction.setUserIdentity(user.getDn());
configurationAction.setUserName(user.getUserName());
configurationAction.setOperation(Operation.Configure);
configurationAction.setTimestamp(actionTimestamp);
@ -263,7 +265,7 @@ public class RelationshipAuditor extends NiFiAuditor {
final String formattedRelationships = relationshipNames.isEmpty() ? StringUtils.EMPTY : StringUtils.join(relationshipNames, ", ");
// create the connect details
final ConnectDetails connectDetails = new ConnectDetails();
final FlowChangeConnectDetails connectDetails = new FlowChangeConnectDetails();
connectDetails.setSourceId(source.getIdentifier());
connectDetails.setSourceName(source.getName());
connectDetails.setSourceType(sourceType);
@ -327,7 +329,7 @@ public class RelationshipAuditor extends NiFiAuditor {
* @return action
*/
public Action generateAuditRecordForConnection(Connection connection, Operation operation, ActionDetails actionDetails) {
Action action = null;
FlowChangeAction action = null;
// get the current user
NiFiUser user = NiFiUserUtils.getNiFiUser();
@ -350,8 +352,8 @@ public class RelationshipAuditor extends NiFiAuditor {
Date actionTimestamp = new Date();
// create a new relationship action
action = new Action();
action.setUserDn(user.getDn());
action = new FlowChangeAction();
action.setUserIdentity(user.getDn());
action.setUserName(user.getUserName());
action.setOperation(operation);
action.setTimestamp(actionTimestamp);

View File

@ -23,10 +23,11 @@ import java.util.HashMap;
import java.util.Map;
import org.apache.nifi.action.Action;
import org.apache.nifi.action.Component;
import org.apache.nifi.action.FlowChangeAction;
import org.apache.nifi.action.Operation;
import org.apache.nifi.action.component.details.RemoteProcessGroupDetails;
import org.apache.nifi.action.component.details.FlowChangeRemoteProcessGroupDetails;
import org.apache.nifi.action.details.ActionDetails;
import org.apache.nifi.action.details.ConfigureDetails;
import org.apache.nifi.action.details.FlowChangeConfigureDetails;
import org.apache.nifi.groups.RemoteProcessGroup;
import org.apache.nifi.remote.RemoteGroupPort;
import org.apache.nifi.web.security.user.NiFiUserUtils;
@ -123,7 +124,7 @@ public class RemoteProcessGroupAuditor extends NiFiAuditor {
// see if the communications timeout has changed
if (remoteProcessGroupDTO.getCommunicationsTimeout() != null && !updatedRemoteProcessGroup.getCommunicationsTimeout().equals(communicationsTimeout)) {
// create the config details
ConfigureDetails configDetails = new ConfigureDetails();
FlowChangeConfigureDetails configDetails = new FlowChangeConfigureDetails();
configDetails.setName("Communications Timeout");
configDetails.setValue(updatedRemoteProcessGroup.getCommunicationsTimeout());
configDetails.setPreviousValue(communicationsTimeout);
@ -134,7 +135,7 @@ public class RemoteProcessGroupAuditor extends NiFiAuditor {
// see if the yield duration has changed
if (remoteProcessGroupDTO.getYieldDuration() != null && !updatedRemoteProcessGroup.getYieldDuration().equals(yieldDuration)) {
// create the config details
ConfigureDetails configDetails = new ConfigureDetails();
FlowChangeConfigureDetails configDetails = new FlowChangeConfigureDetails();
configDetails.setName("Yield Duration");
configDetails.setValue(updatedRemoteProcessGroup.getYieldDuration());
configDetails.setPreviousValue(yieldDuration);
@ -162,7 +163,7 @@ public class RemoteProcessGroupAuditor extends NiFiAuditor {
final Integer previousConcurrentTasks = concurrentTasks.get(remotePortDTO.getId());
if (previousConcurrentTasks != null && remotePort.getMaxConcurrentTasks() != previousConcurrentTasks) {
// create the config details
ConfigureDetails concurrentTasksDetails = new ConfigureDetails();
FlowChangeConfigureDetails concurrentTasksDetails = new FlowChangeConfigureDetails();
concurrentTasksDetails.setName("Concurrent Tasks");
concurrentTasksDetails.setValue(String.valueOf(remotePort.getMaxConcurrentTasks()));
concurrentTasksDetails.setPreviousValue(String.valueOf(previousConcurrentTasks));
@ -177,7 +178,7 @@ public class RemoteProcessGroupAuditor extends NiFiAuditor {
final Boolean previousCompression = compression.get(remotePortDTO.getId());
if (previousCompression != null && remotePort.isUseCompression() != previousCompression) {
// create the config details
ConfigureDetails compressionDetails = new ConfigureDetails();
FlowChangeConfigureDetails compressionDetails = new FlowChangeConfigureDetails();
compressionDetails.setName("Compressed");
compressionDetails.setValue(String.valueOf(remotePort.isUseCompression()));
compressionDetails.setPreviousValue(String.valueOf(previousCompression));
@ -204,7 +205,7 @@ public class RemoteProcessGroupAuditor extends NiFiAuditor {
final Integer previousConcurrentTasks = concurrentTasks.get(remotePortDTO.getId());
if (previousConcurrentTasks != null && remotePort.getMaxConcurrentTasks() != previousConcurrentTasks) {
// create the config details
ConfigureDetails concurrentTasksDetails = new ConfigureDetails();
FlowChangeConfigureDetails concurrentTasksDetails = new FlowChangeConfigureDetails();
concurrentTasksDetails.setName("Concurrent Tasks");
concurrentTasksDetails.setValue(String.valueOf(remotePort.getMaxConcurrentTasks()));
concurrentTasksDetails.setPreviousValue(String.valueOf(previousConcurrentTasks));
@ -219,7 +220,7 @@ public class RemoteProcessGroupAuditor extends NiFiAuditor {
final Boolean previousCompression = compression.get(remotePortDTO.getId());
if (previousCompression != null && remotePort.isUseCompression() != previousCompression) {
// create the config details
ConfigureDetails compressionDetails = new ConfigureDetails();
FlowChangeConfigureDetails compressionDetails = new FlowChangeConfigureDetails();
compressionDetails.setName("Compressed");
compressionDetails.setValue(String.valueOf(remotePort.isUseCompression()));
compressionDetails.setPreviousValue(String.valueOf(previousCompression));
@ -234,7 +235,7 @@ public class RemoteProcessGroupAuditor extends NiFiAuditor {
Collection<Action> actions = new ArrayList<>();
// create the remote process group details
RemoteProcessGroupDetails remoteProcessGroupDetails = new RemoteProcessGroupDetails();
FlowChangeRemoteProcessGroupDetails remoteProcessGroupDetails = new FlowChangeRemoteProcessGroupDetails();
remoteProcessGroupDetails.setUri(remoteProcessGroup.getTargetUri().toString());
// save the actions if necessary
@ -244,8 +245,8 @@ public class RemoteProcessGroupAuditor extends NiFiAuditor {
// create the actions
for (ActionDetails detail : details) {
// create the port action for updating the name
Action remoteProcessGroupAction = new Action();
remoteProcessGroupAction.setUserDn(user.getDn());
FlowChangeAction remoteProcessGroupAction = new FlowChangeAction();
remoteProcessGroupAction.setUserIdentity(user.getDn());
remoteProcessGroupAction.setUserName(user.getUserName());
remoteProcessGroupAction.setOperation(Operation.Configure);
remoteProcessGroupAction.setTimestamp(timestamp);
@ -265,8 +266,8 @@ public class RemoteProcessGroupAuditor extends NiFiAuditor {
// determine if the running state has changed
if (transmissionState != updatedTransmissionState) {
// create a processor action
Action remoteProcessGroupAction = new Action();
remoteProcessGroupAction.setUserDn(user.getDn());
FlowChangeAction remoteProcessGroupAction = new FlowChangeAction();
remoteProcessGroupAction.setUserIdentity(user.getDn());
remoteProcessGroupAction.setUserName(user.getUserName());
remoteProcessGroupAction.setTimestamp(new Date());
remoteProcessGroupAction.setSourceId(updatedRemoteProcessGroup.getIdentifier());
@ -342,7 +343,7 @@ public class RemoteProcessGroupAuditor extends NiFiAuditor {
* @return action
*/
public Action generateAuditRecord(RemoteProcessGroup remoteProcessGroup, Operation operation, ActionDetails actionDetails) {
Action action = null;
FlowChangeAction action = null;
// get the current user
NiFiUser user = NiFiUserUtils.getNiFiUser();
@ -350,12 +351,12 @@ public class RemoteProcessGroupAuditor extends NiFiAuditor {
// ensure the user was found
if (user != null) {
// create the remote process group details
RemoteProcessGroupDetails remoteProcessGroupDetails = new RemoteProcessGroupDetails();
FlowChangeRemoteProcessGroupDetails remoteProcessGroupDetails = new FlowChangeRemoteProcessGroupDetails();
remoteProcessGroupDetails.setUri(remoteProcessGroup.getTargetUri().toString());
// create the remote process group action
action = new Action();
action.setUserDn(user.getDn());
action = new FlowChangeAction();
action.setUserIdentity(user.getDn());
action.setUserName(user.getUserName());
action.setOperation(operation);
action.setTimestamp(new Date());

View File

@ -25,10 +25,11 @@ import java.util.Set;
import org.apache.nifi.action.Action;
import org.apache.nifi.action.Component;
import org.apache.nifi.action.FlowChangeAction;
import org.apache.nifi.action.Operation;
import org.apache.nifi.action.component.details.ExtensionDetails;
import org.apache.nifi.action.component.details.FlowChangeExtensionDetails;
import org.apache.nifi.action.details.ActionDetails;
import org.apache.nifi.action.details.ConfigureDetails;
import org.apache.nifi.action.details.FlowChangeConfigureDetails;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.controller.ReportingTaskNode;
import org.apache.nifi.controller.ScheduledState;
@ -114,7 +115,7 @@ public class ReportingTaskAuditor extends NiFiAuditor {
Map<String, String> updatedValues = extractConfiguredPropertyValues(reportingTask, reportingTaskDTO);
// create the reporting task details
ExtensionDetails taskDetails = new ExtensionDetails();
FlowChangeExtensionDetails taskDetails = new FlowChangeExtensionDetails();
taskDetails.setType(reportingTask.getReportingTask().getClass().getSimpleName());
// create a reporting task action
@ -152,14 +153,14 @@ public class ReportingTaskAuditor extends NiFiAuditor {
}
}
final ConfigureDetails actionDetails = new ConfigureDetails();
final FlowChangeConfigureDetails actionDetails = new FlowChangeConfigureDetails();
actionDetails.setName(property);
actionDetails.setValue(newValue);
actionDetails.setPreviousValue(oldValue);
// create a configuration action
Action configurationAction = new Action();
configurationAction.setUserDn(user.getDn());
FlowChangeAction configurationAction = new FlowChangeAction();
configurationAction.setUserIdentity(user.getDn());
configurationAction.setUserName(user.getUserName());
configurationAction.setOperation(operation);
configurationAction.setTimestamp(actionTimestamp);
@ -178,8 +179,8 @@ public class ReportingTaskAuditor extends NiFiAuditor {
// determine if the running state has changed and its not disabled
if (scheduledState != updatedScheduledState) {
// create a reporting task action
Action taskAction = new Action();
taskAction.setUserDn(user.getDn());
FlowChangeAction taskAction = new FlowChangeAction();
taskAction.setUserIdentity(user.getDn());
taskAction.setUserName(user.getUserName());
taskAction.setTimestamp(new Date());
taskAction.setSourceId(reportingTask.getIdentifier());
@ -262,7 +263,7 @@ public class ReportingTaskAuditor extends NiFiAuditor {
* @return action
*/
public Action generateAuditRecord(ReportingTaskNode reportingTask, Operation operation, ActionDetails actionDetails) {
Action action = null;
FlowChangeAction action = null;
// get the current user
NiFiUser user = NiFiUserUtils.getNiFiUser();
@ -270,12 +271,12 @@ public class ReportingTaskAuditor extends NiFiAuditor {
// ensure the user was found
if (user != null) {
// create the reporting task details
ExtensionDetails taskDetails = new ExtensionDetails();
FlowChangeExtensionDetails taskDetails = new FlowChangeExtensionDetails();
taskDetails.setType(reportingTask.getReportingTask().getClass().getSimpleName());
// create the reporting task action for adding this reporting task
action = new Action();
action.setUserDn(user.getDn());
action = new FlowChangeAction();
action.setUserIdentity(user.getDn());
action.setUserName(user.getUserName());
action.setOperation(operation);
action.setTimestamp(new Date());

View File

@ -24,10 +24,12 @@ import java.util.Set;
import org.apache.nifi.action.Action;
import org.apache.nifi.action.Component;
import org.apache.nifi.action.FlowChangeAction;
import org.apache.nifi.action.Operation;
import org.apache.nifi.action.component.details.ExtensionDetails;
import org.apache.nifi.action.component.details.RemoteProcessGroupDetails;
import org.apache.nifi.action.component.details.FlowChangeExtensionDetails;
import org.apache.nifi.action.component.details.FlowChangeRemoteProcessGroupDetails;
import org.apache.nifi.action.details.ConnectDetails;
import org.apache.nifi.action.details.FlowChangeConnectDetails;
import org.apache.nifi.connectable.ConnectableType;
import org.apache.nifi.connectable.Connection;
import org.apache.nifi.connectable.Funnel;
@ -136,10 +138,10 @@ public class SnippetAuditor extends NiFiAuditor {
// remote processor groups
for (final RemoteProcessGroupDTO remoteProcessGroup : snippet.getRemoteProcessGroups()) {
RemoteProcessGroupDetails remoteProcessGroupDetails = new RemoteProcessGroupDetails();
FlowChangeRemoteProcessGroupDetails remoteProcessGroupDetails = new FlowChangeRemoteProcessGroupDetails();
remoteProcessGroupDetails.setUri(remoteProcessGroup.getTargetUri());
final Action action = generateAuditRecord(remoteProcessGroup.getId(), remoteProcessGroup.getName(), Component.RemoteProcessGroup, Operation.Add, timestamp);
final FlowChangeAction action = generateAuditRecord(remoteProcessGroup.getId(), remoteProcessGroup.getName(), Component.RemoteProcessGroup, Operation.Add, timestamp);
action.setComponentDetails(remoteProcessGroupDetails);
actions.add(action);
}
@ -151,10 +153,10 @@ public class SnippetAuditor extends NiFiAuditor {
// processors
for (final ProcessorDTO processor : snippet.getProcessors()) {
final ExtensionDetails processorDetails = new ExtensionDetails();
final FlowChangeExtensionDetails processorDetails = new FlowChangeExtensionDetails();
processorDetails.setType(StringUtils.substringAfterLast(processor.getType(), "."));
final Action action = generateAuditRecord(processor.getId(), processor.getName(), Component.Processor, Operation.Add, timestamp);
final FlowChangeAction action = generateAuditRecord(processor.getId(), processor.getName(), Component.Processor, Operation.Add, timestamp);
action.setComponentDetails(processorDetails);
actions.add(action);
}
@ -174,7 +176,7 @@ public class SnippetAuditor extends NiFiAuditor {
final String name = StringUtils.isBlank(connection.getName()) ? relationships : connection.getName();
// create the connect details
ConnectDetails connectDetails = new ConnectDetails();
FlowChangeConnectDetails connectDetails = new FlowChangeConnectDetails();
connectDetails.setSourceId(source.getId());
connectDetails.setSourceName(source.getName());
connectDetails.setSourceType(determineConnectableType(source));
@ -184,7 +186,7 @@ public class SnippetAuditor extends NiFiAuditor {
connectDetails.setDestinationType(determineConnectableType(destination));
// create the audit record
final Action action = generateAuditRecord(connection.getId(), name, Component.Connection, Operation.Connect, timestamp);
final FlowChangeAction action = generateAuditRecord(connection.getId(), name, Component.Connection, Operation.Connect, timestamp);
action.setActionDetails(connectDetails);
actions.add(action);
}
@ -220,8 +222,8 @@ public class SnippetAuditor extends NiFiAuditor {
/**
* Generates an audit record for the creation of the specified funnel.
*/
private Action generateAuditRecord(String id, String name, Component type, Operation operation, Date timestamp) {
Action action = null;
private FlowChangeAction generateAuditRecord(String id, String name, Component type, Operation operation, Date timestamp) {
FlowChangeAction action = null;
// get the current user
NiFiUser user = NiFiUserUtils.getNiFiUser();
@ -229,8 +231,8 @@ public class SnippetAuditor extends NiFiAuditor {
// ensure the user was found
if (user != null) {
// create the action for adding this funnel
action = new Action();
action.setUserDn(user.getDn());
action = new FlowChangeAction();
action.setUserIdentity(user.getDn());
action.setUserName(user.getUserName());
action.setOperation(operation);
action.setTimestamp(timestamp);

View File

@ -37,8 +37,9 @@ import javax.ws.rs.WebApplicationException;
import org.apache.nifi.action.Action;
import org.apache.nifi.action.Component;
import org.apache.nifi.action.FlowChangeAction;
import org.apache.nifi.action.Operation;
import org.apache.nifi.action.details.PurgeDetails;
import org.apache.nifi.action.details.FlowChangePurgeDetails;
import org.apache.nifi.admin.service.AccountNotFoundException;
import org.apache.nifi.admin.service.AuditService;
import org.apache.nifi.admin.service.UserService;
@ -1765,12 +1766,12 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
}
// create the purge details
PurgeDetails details = new PurgeDetails();
FlowChangePurgeDetails details = new FlowChangePurgeDetails();
details.setEndDate(endDate);
// create a purge action to record that records are being removed
Action purgeAction = new Action();
purgeAction.setUserDn(user.getDn());
FlowChangeAction purgeAction = new FlowChangeAction();
purgeAction.setUserIdentity(user.getDn());
purgeAction.setUserName(user.getUserName());
purgeAction.setOperation(Operation.Purge);
purgeAction.setTimestamp(new Date());

View File

@ -34,9 +34,10 @@ import javax.ws.rs.core.Response;
import org.apache.nifi.action.Action;
import org.apache.nifi.action.Component;
import org.apache.nifi.action.FlowChangeAction;
import org.apache.nifi.action.Operation;
import org.apache.nifi.action.component.details.ExtensionDetails;
import org.apache.nifi.action.details.ConfigureDetails;
import org.apache.nifi.action.component.details.FlowChangeExtensionDetails;
import org.apache.nifi.action.details.FlowChangeConfigureDetails;
import org.apache.nifi.admin.service.AuditService;
import org.apache.nifi.cluster.manager.NodeResponse;
import org.apache.nifi.cluster.manager.impl.WebClusterManager;
@ -121,21 +122,21 @@ public class StandardNiFiWebConfigurationContext implements NiFiWebConfiguration
final Date now = new Date();
final Collection<Action> actions = new HashSet<>(configurationActions.size());
for (final ConfigurationAction configurationAction : configurationActions) {
final ExtensionDetails extensionDetails = new ExtensionDetails();
final FlowChangeExtensionDetails extensionDetails = new FlowChangeExtensionDetails();
extensionDetails.setType(configurationAction.getType());
final ConfigureDetails configureDetails = new ConfigureDetails();
final FlowChangeConfigureDetails configureDetails = new FlowChangeConfigureDetails();
configureDetails.setName(configurationAction.getName());
configureDetails.setPreviousValue(configurationAction.getPreviousValue());
configureDetails.setValue(configurationAction.getValue());
final Action action = new Action();
final FlowChangeAction action = new FlowChangeAction();
action.setTimestamp(now);
action.setSourceId(configurationAction.getId());
action.setSourceName(configurationAction.getName());
action.setSourceType(componentType);
action.setOperation(Operation.Configure);
action.setUserDn(getCurrentUserDn());
action.setUserIdentity(getCurrentUserDn());
action.setUserName(getCurrentUserName());
action.setComponentDetails(extensionDetails);
action.setActionDetails(configureDetails);

View File

@ -34,9 +34,10 @@ import javax.ws.rs.core.Response;
import org.apache.nifi.action.Action;
import org.apache.nifi.action.Component;
import org.apache.nifi.action.FlowChangeAction;
import org.apache.nifi.action.Operation;
import org.apache.nifi.action.component.details.ExtensionDetails;
import org.apache.nifi.action.details.ConfigureDetails;
import org.apache.nifi.action.component.details.FlowChangeExtensionDetails;
import org.apache.nifi.action.details.FlowChangeConfigureDetails;
import org.apache.nifi.admin.service.AuditService;
import org.apache.nifi.cluster.manager.NodeResponse;
import org.apache.nifi.cluster.manager.impl.WebClusterManager;
@ -94,21 +95,21 @@ public class StandardNiFiWebContext implements NiFiWebContext {
final Date now = new Date();
final Collection<Action> actions = new HashSet<>(processorActions.size());
for (final ProcessorConfigurationAction processorAction : processorActions) {
final ExtensionDetails processorDetails = new ExtensionDetails();
final FlowChangeExtensionDetails processorDetails = new FlowChangeExtensionDetails();
processorDetails.setType(processorAction.getProcessorType());
final ConfigureDetails configureDetails = new ConfigureDetails();
final FlowChangeConfigureDetails configureDetails = new FlowChangeConfigureDetails();
configureDetails.setName(processorAction.getName());
configureDetails.setPreviousValue(processorAction.getPreviousValue());
configureDetails.setValue(processorAction.getValue());
final Action action = new Action();
final FlowChangeAction action = new FlowChangeAction();
action.setTimestamp(now);
action.setSourceId(processorAction.getProcessorId());
action.setSourceName(processorAction.getProcessorName());
action.setSourceType(Component.Processor);
action.setOperation(Operation.Configure);
action.setUserDn(getCurrentUserDn());
action.setUserIdentity(getCurrentUserDn());
action.setUserName(getCurrentUserName());
action.setComponentDetails(processorDetails);
action.setActionDetails(configureDetails);

View File

@ -40,6 +40,7 @@ import javax.ws.rs.core.UriBuilder;
import javax.ws.rs.core.UriBuilderException;
import javax.ws.rs.core.UriInfo;
import org.apache.nifi.action.Action;
import org.apache.nifi.action.FlowChangeAction;
import org.apache.nifi.action.Operation;
import org.apache.nifi.cluster.context.ClusterContext;
import org.apache.nifi.cluster.context.ClusterContextThreadLocal;
@ -203,12 +204,12 @@ public abstract class ApplicationResource {
clusterCtx.getActions().clear();
// create the batch action
Action batchAction = new Action();
FlowChangeAction batchAction = new FlowChangeAction();
batchAction.setOperation(Operation.Batch);
// copy values from prototype action
batchAction.setTimestamp(prototypeAction.getTimestamp());
batchAction.setUserDn(prototypeAction.getUserDn());
batchAction.setUserIdentity(prototypeAction.getUserIdentity());
batchAction.setUserName(prototypeAction.getUserName());
batchAction.setSourceId(prototypeAction.getSourceId());
batchAction.setSourceName(prototypeAction.getSourceName());

View File

@ -40,10 +40,16 @@ import javax.ws.rs.WebApplicationException;
import org.apache.nifi.action.Action;
import org.apache.nifi.action.component.details.ComponentDetails;
import org.apache.nifi.action.component.details.ExtensionDetails;
import org.apache.nifi.action.component.details.FlowChangeExtensionDetails;
import org.apache.nifi.action.component.details.FlowChangeRemoteProcessGroupDetails;
import org.apache.nifi.action.component.details.RemoteProcessGroupDetails;
import org.apache.nifi.action.details.ActionDetails;
import org.apache.nifi.action.details.ConfigureDetails;
import org.apache.nifi.action.details.ConnectDetails;
import org.apache.nifi.action.details.FlowChangeConfigureDetails;
import org.apache.nifi.action.details.FlowChangeConnectDetails;
import org.apache.nifi.action.details.FlowChangeMoveDetails;
import org.apache.nifi.action.details.FlowChangePurgeDetails;
import org.apache.nifi.action.details.MoveDetails;
import org.apache.nifi.action.details.PurgeDetails;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
@ -156,7 +162,7 @@ public final class DtoFactory {
actionDto.setSourceName(action.getSourceName());
actionDto.setSourceType(action.getSourceType().name());
actionDto.setTimestamp(action.getTimestamp());
actionDto.setUserDn(action.getUserDn());
actionDto.setUserDn(action.getUserIdentity());
actionDto.setUserName(action.getUserName());
actionDto.setOperation(action.getOperation().name());
actionDto.setActionDetails(createActionDetailsDto(action.getActionDetails()));
@ -176,13 +182,13 @@ public final class DtoFactory {
return null;
}
if (actionDetails instanceof ConfigureDetails) {
if (actionDetails instanceof FlowChangeConfigureDetails) {
final ConfigureDetailsDTO configureDetails = new ConfigureDetailsDTO();
configureDetails.setName(((ConfigureDetails) actionDetails).getName());
configureDetails.setPreviousValue(((ConfigureDetails) actionDetails).getPreviousValue());
configureDetails.setValue(((ConfigureDetails) actionDetails).getValue());
return configureDetails;
} else if (actionDetails instanceof ConnectDetails) {
} else if (actionDetails instanceof FlowChangeConnectDetails) {
final ConnectDetailsDTO connectDetails = new ConnectDetailsDTO();
connectDetails.setSourceId(((ConnectDetails) actionDetails).getSourceId());
connectDetails.setSourceName(((ConnectDetails) actionDetails).getSourceName());
@ -192,14 +198,14 @@ public final class DtoFactory {
connectDetails.setDestinationName(((ConnectDetails) actionDetails).getDestinationName());
connectDetails.setDestinationType(((ConnectDetails) actionDetails).getDestinationType().toString());
return connectDetails;
} else if (actionDetails instanceof MoveDetails) {
} else if (actionDetails instanceof FlowChangeMoveDetails) {
final MoveDetailsDTO moveDetails = new MoveDetailsDTO();
moveDetails.setPreviousGroup(((MoveDetails) actionDetails).getPreviousGroup());
moveDetails.setPreviousGroupId(((MoveDetails) actionDetails).getPreviousGroupId());
moveDetails.setGroup(((MoveDetails) actionDetails).getGroup());
moveDetails.setGroupId(((MoveDetails) actionDetails).getGroupId());
return moveDetails;
} else if (actionDetails instanceof PurgeDetails) {
} else if (actionDetails instanceof FlowChangePurgeDetails) {
final PurgeDetailsDTO purgeDetails = new PurgeDetailsDTO();
purgeDetails.setEndDate(((PurgeDetails) actionDetails).getEndDate());
return purgeDetails;
@ -219,11 +225,11 @@ public final class DtoFactory {
return null;
}
if (componentDetails instanceof ExtensionDetails) {
if (componentDetails instanceof FlowChangeExtensionDetails) {
final ExtensionDetailsDTO processorDetails = new ExtensionDetailsDTO();
processorDetails.setType(((ExtensionDetails) componentDetails).getType());
return processorDetails;
} else if (componentDetails instanceof RemoteProcessGroupDetails) {
} else if (componentDetails instanceof FlowChangeRemoteProcessGroupDetails) {
final RemoteProcessGroupDetailsDTO remoteProcessGroupDetails = new RemoteProcessGroupDetailsDTO();
remoteProcessGroupDetails.setUri(((RemoteProcessGroupDetails) componentDetails).getUri());
return remoteProcessGroupDetails;