NIFI-3162: Audit RPG and RPG port config changes.

- Added configure audits for Transport Protocol, HTTP Proxy Server Host,
  Port, User and Password in RemoteProcessGroup configuration
- Added configure audits for enabling/disabling individual remote port
- Added configure audits for Concurrent Tasks and Compressed in Remote
  Port configuration
- This closes #1476
This commit is contained in:
Koji Kawamura 2017-02-07 10:11:30 +09:00 committed by Matt Gilman
parent 93150d3efa
commit 5af6eb17b0
No known key found for this signature in database
GPG Key ID: DF61EC19432AEE37
2 changed files with 830 additions and 147 deletions

View File

@ -22,12 +22,13 @@ import org.apache.nifi.action.FlowChangeAction;
import org.apache.nifi.action.Operation;
import org.apache.nifi.action.component.details.FlowChangeRemoteProcessGroupDetails;
import org.apache.nifi.action.details.ActionDetails;
import org.apache.nifi.action.details.ConfigureDetails;
import org.apache.nifi.action.details.FlowChangeConfigureDetails;
import org.apache.nifi.authorization.user.NiFiUserUtils;
import org.apache.nifi.groups.RemoteProcessGroup;
import org.apache.nifi.remote.RemoteGroupPort;
import org.apache.nifi.authorization.user.NiFiUser;
import org.apache.nifi.web.api.dto.RemoteProcessGroupContentsDTO;
import org.apache.nifi.util.StringUtils;
import org.apache.nifi.web.api.dto.RemoteProcessGroupDTO;
import org.apache.nifi.web.api.dto.RemoteProcessGroupPortDTO;
import org.apache.nifi.web.dao.RemoteProcessGroupDAO;
@ -38,10 +39,17 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.BiFunction;
import java.util.function.Function;
import static org.apache.nifi.web.api.dto.DtoFactory.SENSITIVE_VALUE_MASK;
/**
* Audits remote process group creation/removal and configuration changes.
@ -51,6 +59,44 @@ public class RemoteProcessGroupAuditor extends NiFiAuditor {
private static final Logger logger = LoggerFactory.getLogger(RemoteProcessGroupAuditor.class);
// Proxy settings should be able to be null cleared, so the necessity of checking those properties depend on
// whether transport protocol is specified.
// See StandardRemoteProcessGroupDAO.updateRemoteProcessGroup for detail.
private static final Function<RemoteProcessGroupDTO, Boolean> IS_TRANSPORT_PROTOCOL_SET = dto -> dto.getTransportProtocol() != null;
private static final List<ConfigurationRecorder<RemoteProcessGroup, RemoteProcessGroupDTO>> CONFIG_RECORDERS = Arrays.asList(
new ConfigurationRecorder<RemoteProcessGroup, RemoteProcessGroupDTO>("Communications Timeout",
dto -> dto.getCommunicationsTimeout() != null, RemoteProcessGroup::getCommunicationsTimeout),
new ConfigurationRecorder<RemoteProcessGroup, RemoteProcessGroupDTO>("Yield Duration",
dto -> dto.getYieldDuration() != null, RemoteProcessGroup::getYieldDuration),
new ConfigurationRecorder<RemoteProcessGroup, RemoteProcessGroupDTO>("Transport Protocol",
IS_TRANSPORT_PROTOCOL_SET, rpg -> rpg.getTransportProtocol().name()),
new ConfigurationRecorder<>("Proxy Host",
IS_TRANSPORT_PROTOCOL_SET, RemoteProcessGroup::getProxyHost),
new ConfigurationRecorder<>("Proxy Port",
IS_TRANSPORT_PROTOCOL_SET, RemoteProcessGroup::getProxyPort),
new ConfigurationRecorder<>("Proxy User",
IS_TRANSPORT_PROTOCOL_SET, RemoteProcessGroup::getProxyUser),
new ConfigurationRecorder<>("Proxy Password",
IS_TRANSPORT_PROTOCOL_SET, RemoteProcessGroup::getProxyPassword)
.setConvertRawValue(v -> StringUtils.isEmpty(v) ? "" : SENSITIVE_VALUE_MASK)
);
private static final BiFunction<RemoteGroupPort, String, String> PORT_NAME_CONVERT = (updated, name) -> updated.getName() + "." + name;
private static final List<ConfigurationRecorder<RemoteGroupPort, RemoteProcessGroupPortDTO>> PORT_CONFIG_RECORDERS = Arrays.asList(
new ConfigurationRecorder<RemoteGroupPort, RemoteProcessGroupPortDTO>("Transmission",
dto -> dto.isTransmitting() != null, RemoteGroupPort::isRunning)
.setConvertName(PORT_NAME_CONVERT)
.setConvertRawValue(v -> Boolean.valueOf(v) ? "enabled" : "disabled"),
new ConfigurationRecorder<RemoteGroupPort, RemoteProcessGroupPortDTO>("Concurrent Tasks",
dto -> dto.getConcurrentlySchedulableTaskCount() != null, RemoteGroupPort::getMaxConcurrentTasks)
.setConvertName(PORT_NAME_CONVERT),
new ConfigurationRecorder<RemoteGroupPort, RemoteProcessGroupPortDTO>("Compressed",
dto -> dto.getUseCompression() != null, RemoteGroupPort::isUseCompression)
.setConvertName(PORT_NAME_CONVERT)
);
/**
* Audits the creation of remote process groups via createRemoteProcessGroup().
*
@ -79,6 +125,109 @@ public class RemoteProcessGroupAuditor extends NiFiAuditor {
return remoteProcessGroup;
}
/**
* Provides a higher order functions to compute Configuration audit events.
* @param <CMP> Class of a component, such as RemoteProcessGroup or RemoteGroupPort
* @param <DTO> Class of a DTO, such as RemoteProcessGroupDTO or RemoteProcessGroupPortDTO
*/
private static class ConfigurationRecorder<CMP, DTO> {
final String name;
final Function<DTO, Boolean> hasInput;
final Function<CMP, Object> getValue;
private Function<String, String> convertRawValue;
private BiFunction<CMP, String, String> convertName;
/**
* Create a recorder for a configuration property.
* @param name name of the target property
* @param hasInput a function that returns whether the property is being updated by a request
* @param getValue a function that returns value of the property
*/
private ConfigurationRecorder(String name, Function<DTO, Boolean> hasInput, Function<CMP, Object> getValue) {
this.name = name;
this.hasInput = hasInput;
this.getValue = getValue;
}
private String convertRawValue(final String value) {
return convertRawValue != null ? convertRawValue.apply(value) : value;
}
/**
* If a property value needs to be converted for audit record, e.g. sensitive values,
* use this method to specify a function to convert raw value.
* @param convertRawValue a function to convert string representation of a property value
* @return converted value
*/
private ConfigurationRecorder<CMP, DTO> setConvertRawValue(final Function<String, String> convertRawValue) {
this.convertRawValue = convertRawValue;
return this;
}
/**
* If a property name needs to be decorated depends on other values in a context,
* use this method to specify a function to convert name.
* @param convertName a function to convert name of a property
* @return converted name
*/
private ConfigurationRecorder<CMP, DTO> setConvertName(final BiFunction<CMP, String, String> convertName) {
this.convertName = convertName;
return this;
}
private ConfigureDetails checkConfigured(final DTO input,
final CMP updated,
final Object previousValue) {
final Object updatedValue = getValue.apply(updated);
// Convert null to empty String to avoid NullPointerException.
final String updatedStr = updatedValue != null ? updatedValue.toString() : "";
final String previousStr = previousValue != null ? previousValue.toString() : "";
if (hasInput.apply(input) && !updatedStr.equals(previousStr)) {
FlowChangeConfigureDetails configDetails = new FlowChangeConfigureDetails();
configDetails.setName(convertName != null ? convertName.apply(updated, name) : name);
configDetails.setPreviousValue(convertRawValue(previousStr));
configDetails.setValue(convertRawValue(updatedStr));
return configDetails;
}
return null;
}
/**
* Capture values before a component to be updated. This method should be called before proceeding a joint point.
* @param recorders list of ConfigurationRecorder
* @param component a target component to capture
* @param <C> Class of the target component
* @param <D> DTO class of the target component
* @return captured values keyed with its name
*/
private static <C, D> Map<String, Object> capturePreviousValues(final List<ConfigurationRecorder<C, D>> recorders, final C component) {
final Map<String, Object> previousValues = new HashMap<>(recorders.size());
recorders.forEach(r -> previousValues.put(r.name, r.getValue.apply(component)));
return previousValues;
}
/**
* Generate ActionDetails for properties those have been updated.
* This method should be called after proceeding a joint point with an updated component.
* @param recorders list of ConfigurationRecorder
* @param dto DTO instance containing requested values
* @param updatedComponent a component instance that is updated by corresponding DAO
* @param previousValues previous property values before being updated
* @param details a Collection to accumulate action details generated
* @param <C> Class of the target component
* @param <D> DTO class of the target component
*/
private static <C, D> void checkConfigured(final List<ConfigurationRecorder<C, D>> recorders, final D dto, final C updatedComponent,
final Map<String, Object> previousValues, final Collection<ActionDetails> details) {
recorders.stream()
.map(r -> r.checkConfigured(dto, updatedComponent, previousValues.get(r.name)))
.filter(Objects::nonNull).forEach(d -> details.add(d));
}
}
/**
* Audits the update of remote process group configuration.
*
@ -98,18 +247,7 @@ public class RemoteProcessGroupAuditor extends NiFiAuditor {
// record the current value of this remoteProcessGroups configuration for comparisons later
final boolean transmissionState = remoteProcessGroup.isTransmitting();
final String communicationsTimeout = remoteProcessGroup.getCommunicationsTimeout();
final String yieldDuration = remoteProcessGroup.getYieldDuration();
final Map<String, Integer> concurrentTasks = new HashMap<>();
final Map<String, Boolean> compression = new HashMap<>();
for (final RemoteGroupPort remotePort : remoteProcessGroup.getInputPorts()) {
concurrentTasks.put(remotePort.getIdentifier(), remotePort.getMaxConcurrentTasks());
compression.put(remotePort.getIdentifier(), remotePort.isUseCompression());
}
for (final RemoteGroupPort remotePort : remoteProcessGroup.getOutputPorts()) {
concurrentTasks.put(remotePort.getIdentifier(), remotePort.getMaxConcurrentTasks());
compression.put(remotePort.getIdentifier(), remotePort.isUseCompression());
}
final Map<String, Object> previousValues = ConfigurationRecorder.capturePreviousValues(CONFIG_RECORDERS, remoteProcessGroup);
// perform the underlying operation
final RemoteProcessGroup updatedRemoteProcessGroup = (RemoteProcessGroup) proceedingJoinPoint.proceed();
@ -119,140 +257,26 @@ public class RemoteProcessGroupAuditor extends NiFiAuditor {
// ensure the user was found
if (user != null) {
Collection<ActionDetails> details = new ArrayList<>();
final Collection<ActionDetails> details = new ArrayList<>();
// see if the communications timeout has changed
if (remoteProcessGroupDTO.getCommunicationsTimeout() != null && !updatedRemoteProcessGroup.getCommunicationsTimeout().equals(communicationsTimeout)) {
// create the config details
FlowChangeConfigureDetails configDetails = new FlowChangeConfigureDetails();
configDetails.setName("Communications Timeout");
configDetails.setValue(updatedRemoteProcessGroup.getCommunicationsTimeout());
configDetails.setPreviousValue(communicationsTimeout);
// see if any property has changed
ConfigurationRecorder.checkConfigured(CONFIG_RECORDERS, remoteProcessGroupDTO, updatedRemoteProcessGroup, previousValues, details);
details.add(configDetails);
}
// see if the yield duration has changed
if (remoteProcessGroupDTO.getYieldDuration() != null && !updatedRemoteProcessGroup.getYieldDuration().equals(yieldDuration)) {
// create the config details
FlowChangeConfigureDetails configDetails = new FlowChangeConfigureDetails();
configDetails.setName("Yield Duration");
configDetails.setValue(updatedRemoteProcessGroup.getYieldDuration());
configDetails.setPreviousValue(yieldDuration);
details.add(configDetails);
}
// see if the contents of this remote process group are possibly changing
if (remoteProcessGroupDTO.getContents() != null) {
final RemoteProcessGroupContentsDTO contents = remoteProcessGroupDTO.getContents();
// see if any input port configuration is changing
if (contents.getInputPorts() != null) {
for (final RemoteProcessGroupPortDTO remotePortDTO : contents.getInputPorts()) {
final RemoteGroupPort remotePort = updatedRemoteProcessGroup.getInputPort(remotePortDTO.getId());
// if this port has been removed, ignore the configuration change for auditing purposes
if (remotePort == null) {
continue;
}
// if a new concurrent task count is specified
if (remotePortDTO.getConcurrentlySchedulableTaskCount() != null) {
// see if the concurrent tasks has changed
final Integer previousConcurrentTasks = concurrentTasks.get(remotePortDTO.getId());
if (previousConcurrentTasks != null && remotePort.getMaxConcurrentTasks() != previousConcurrentTasks) {
// create the config details
FlowChangeConfigureDetails concurrentTasksDetails = new FlowChangeConfigureDetails();
concurrentTasksDetails.setName("Concurrent Tasks");
concurrentTasksDetails.setValue(String.valueOf(remotePort.getMaxConcurrentTasks()));
concurrentTasksDetails.setPreviousValue(String.valueOf(previousConcurrentTasks));
details.add(concurrentTasksDetails);
}
}
// if a new compressed flag is specified
if (remotePortDTO.getUseCompression() != null) {
// see if the compression has changed
final Boolean previousCompression = compression.get(remotePortDTO.getId());
if (previousCompression != null && remotePort.isUseCompression() != previousCompression) {
// create the config details
FlowChangeConfigureDetails compressionDetails = new FlowChangeConfigureDetails();
compressionDetails.setName("Compressed");
compressionDetails.setValue(String.valueOf(remotePort.isUseCompression()));
compressionDetails.setPreviousValue(String.valueOf(previousCompression));
details.add(compressionDetails);
}
}
}
}
// see if any output port configuration is changing
if (contents.getOutputPorts() != null) {
for (final RemoteProcessGroupPortDTO remotePortDTO : contents.getOutputPorts()) {
final RemoteGroupPort remotePort = updatedRemoteProcessGroup.getOutputPort(remotePortDTO.getId());
// if this port has been removed, ignore the configuration change for auditing purposes
if (remotePort == null) {
continue;
}
// if a new concurrent task count is specified
if (remotePortDTO.getConcurrentlySchedulableTaskCount() != null) {
// see if the concurrent tasks has changed
final Integer previousConcurrentTasks = concurrentTasks.get(remotePortDTO.getId());
if (previousConcurrentTasks != null && remotePort.getMaxConcurrentTasks() != previousConcurrentTasks) {
// create the config details
FlowChangeConfigureDetails concurrentTasksDetails = new FlowChangeConfigureDetails();
concurrentTasksDetails.setName("Concurrent Tasks");
concurrentTasksDetails.setValue(String.valueOf(remotePort.getMaxConcurrentTasks()));
concurrentTasksDetails.setPreviousValue(String.valueOf(previousConcurrentTasks));
details.add(concurrentTasksDetails);
}
}
// if a new compressed flag is specified
if (remotePortDTO.getUseCompression() != null) {
// see if the compression has changed
final Boolean previousCompression = compression.get(remotePortDTO.getId());
if (previousCompression != null && remotePort.isUseCompression() != previousCompression) {
// create the config details
FlowChangeConfigureDetails compressionDetails = new FlowChangeConfigureDetails();
compressionDetails.setName("Compressed");
compressionDetails.setValue(String.valueOf(remotePort.isUseCompression()));
compressionDetails.setPreviousValue(String.valueOf(previousCompression));
details.add(compressionDetails);
}
}
}
}
}
Collection<Action> actions = new ArrayList<>();
final Date timestamp = new Date();
final Collection<Action> actions = new ArrayList<>();
// create the remote process group details
FlowChangeRemoteProcessGroupDetails remoteProcessGroupDetails = new FlowChangeRemoteProcessGroupDetails();
remoteProcessGroupDetails.setUri(remoteProcessGroup.getTargetUri());
final FlowChangeRemoteProcessGroupDetails remoteProcessGroupDetails = createFlowChangeDetails(remoteProcessGroup);
// save the actions if necessary
if (!details.isEmpty()) {
Date timestamp = new Date();
// create the actions
for (ActionDetails detail : details) {
// create the port action for updating the name
FlowChangeAction remoteProcessGroupAction = new FlowChangeAction();
remoteProcessGroupAction.setUserIdentity(user.getIdentity());
// create a configure action for each updated property
FlowChangeAction remoteProcessGroupAction = createFlowChangeAction(user, timestamp,
updatedRemoteProcessGroup, remoteProcessGroupDetails);
remoteProcessGroupAction.setOperation(Operation.Configure);
remoteProcessGroupAction.setTimestamp(timestamp);
remoteProcessGroupAction.setSourceId(updatedRemoteProcessGroup.getIdentifier());
remoteProcessGroupAction.setSourceName(updatedRemoteProcessGroup.getName());
remoteProcessGroupAction.setSourceType(Component.RemoteProcessGroup);
remoteProcessGroupAction.setComponentDetails(remoteProcessGroupDetails);
remoteProcessGroupAction.setActionDetails(detail);
actions.add(remoteProcessGroupAction);
@ -264,14 +288,9 @@ public class RemoteProcessGroupAuditor extends NiFiAuditor {
// determine if the running state has changed
if (transmissionState != updatedTransmissionState) {
// create a processor action
FlowChangeAction remoteProcessGroupAction = new FlowChangeAction();
remoteProcessGroupAction.setUserIdentity(user.getIdentity());
remoteProcessGroupAction.setTimestamp(new Date());
remoteProcessGroupAction.setSourceId(updatedRemoteProcessGroup.getIdentifier());
remoteProcessGroupAction.setSourceName(updatedRemoteProcessGroup.getName());
remoteProcessGroupAction.setSourceType(Component.RemoteProcessGroup);
remoteProcessGroupAction.setComponentDetails(remoteProcessGroupDetails);
// create a remote process group action
FlowChangeAction remoteProcessGroupAction = createFlowChangeAction(user, timestamp,
updatedRemoteProcessGroup, remoteProcessGroupDetails);
// set the operation accordingly
if (updatedTransmissionState) {
@ -292,6 +311,20 @@ public class RemoteProcessGroupAuditor extends NiFiAuditor {
return updatedRemoteProcessGroup;
}
private FlowChangeAction createFlowChangeAction(final NiFiUser user, final Date timestamp,
final RemoteProcessGroup remoteProcessGroup,
final FlowChangeRemoteProcessGroupDetails remoteProcessGroupDetails) {
FlowChangeAction remoteProcessGroupAction = new FlowChangeAction();
remoteProcessGroupAction.setUserIdentity(user.getIdentity());
remoteProcessGroupAction.setTimestamp(timestamp);
remoteProcessGroupAction.setSourceId(remoteProcessGroup.getIdentifier());
remoteProcessGroupAction.setSourceName(remoteProcessGroup.getName());
remoteProcessGroupAction.setSourceType(Component.RemoteProcessGroup);
remoteProcessGroupAction.setComponentDetails(remoteProcessGroupDetails);
return remoteProcessGroupAction;
}
/**
* Audits the removal of a process group via deleteProcessGroup().
*
@ -320,6 +353,101 @@ public class RemoteProcessGroupAuditor extends NiFiAuditor {
}
}
private RemoteGroupPort auditUpdateProcessGroupPortConfiguration(ProceedingJoinPoint proceedingJoinPoint, RemoteProcessGroupPortDTO remoteProcessGroupPortDto, RemoteProcessGroup remoteProcessGroup, RemoteGroupPort remoteProcessGroupPort) throws Throwable {
final Map<String, Object> previousValues = ConfigurationRecorder.capturePreviousValues(PORT_CONFIG_RECORDERS, remoteProcessGroupPort);
// perform the underlying operation
final RemoteGroupPort updatedRemoteProcessGroupPort = (RemoteGroupPort) proceedingJoinPoint.proceed();
// get the current user
NiFiUser user = NiFiUserUtils.getNiFiUser();
if (user != null) {
final Collection<ActionDetails> details = new ArrayList<>();
// see if any property has changed
ConfigurationRecorder.checkConfigured(PORT_CONFIG_RECORDERS, remoteProcessGroupPortDto, updatedRemoteProcessGroupPort, previousValues, details);
final Date timestamp = new Date();
final Collection<Action> actions = new ArrayList<>();
// create the remote process group details
final FlowChangeRemoteProcessGroupDetails remoteProcessGroupDetails = createFlowChangeDetails(remoteProcessGroup);
// save the actions if necessary
for (ActionDetails detail : details) {
// create a configure action for each updated property
FlowChangeAction remoteProcessGroupAction = createFlowChangeAction(user, timestamp,
remoteProcessGroup, remoteProcessGroupDetails);
remoteProcessGroupAction.setOperation(Operation.Configure);
remoteProcessGroupAction.setActionDetails(detail);
actions.add(remoteProcessGroupAction);
}
// ensure there are actions to record
if (!actions.isEmpty()) {
// save the actions
saveActions(actions, logger);
}
}
return updatedRemoteProcessGroupPort;
}
/**
* Audits the update of remote process group input port configuration.
*
* @param proceedingJoinPoint join point
* @param remoteProcessGroupPortDto dto
* @param remoteProcessGroupDAO dao
* @return group
* @throws Throwable ex
*/
@Around("within(org.apache.nifi.web.dao.RemoteProcessGroupDAO+) && "
+ "execution(org.apache.nifi.remote.RemoteGroupPort updateRemoteProcessGroupInputPort(java.lang.String, org.apache.nifi.web.api.dto.RemoteProcessGroupPortDTO)) && "
+ "args(remoteProcessGroupId, remoteProcessGroupPortDto) && "
+ "target(remoteProcessGroupDAO)")
public RemoteGroupPort auditUpdateProcessGroupInputPortConfiguration(
ProceedingJoinPoint proceedingJoinPoint, String remoteProcessGroupId,
RemoteProcessGroupPortDTO remoteProcessGroupPortDto, RemoteProcessGroupDAO remoteProcessGroupDAO) throws Throwable {
final RemoteProcessGroup remoteProcessGroup = remoteProcessGroupDAO.getRemoteProcessGroup(remoteProcessGroupId);
final RemoteGroupPort remoteProcessGroupPort = remoteProcessGroup.getInputPort(remoteProcessGroupPortDto.getId());
return auditUpdateProcessGroupPortConfiguration(proceedingJoinPoint, remoteProcessGroupPortDto, remoteProcessGroup, remoteProcessGroupPort);
}
/**
* Audits the update of remote process group output port configuration.
*
* @param proceedingJoinPoint join point
* @param remoteProcessGroupPortDto dto
* @param remoteProcessGroupDAO dao
* @return group
* @throws Throwable ex
*/
@Around("within(org.apache.nifi.web.dao.RemoteProcessGroupDAO+) && "
+ "execution(org.apache.nifi.remote.RemoteGroupPort updateRemoteProcessGroupOutputPort(java.lang.String, org.apache.nifi.web.api.dto.RemoteProcessGroupPortDTO)) && "
+ "args(remoteProcessGroupId, remoteProcessGroupPortDto) && "
+ "target(remoteProcessGroupDAO)")
public RemoteGroupPort auditUpdateProcessGroupOutputPortConfiguration(
ProceedingJoinPoint proceedingJoinPoint, String remoteProcessGroupId,
RemoteProcessGroupPortDTO remoteProcessGroupPortDto, RemoteProcessGroupDAO remoteProcessGroupDAO) throws Throwable {
final RemoteProcessGroup remoteProcessGroup = remoteProcessGroupDAO.getRemoteProcessGroup(remoteProcessGroupId);
final RemoteGroupPort remoteProcessGroupPort = remoteProcessGroup.getOutputPort(remoteProcessGroupPortDto.getId());
return auditUpdateProcessGroupPortConfiguration(proceedingJoinPoint, remoteProcessGroupPortDto, remoteProcessGroup, remoteProcessGroupPort);
}
private FlowChangeRemoteProcessGroupDetails createFlowChangeDetails(RemoteProcessGroup remoteProcessGroup) {
FlowChangeRemoteProcessGroupDetails remoteProcessGroupDetails = new FlowChangeRemoteProcessGroupDetails();
remoteProcessGroupDetails.setUri(remoteProcessGroup.getTargetUri());
return remoteProcessGroupDetails;
}
/**
* Generates an audit record for the specified remote process group.
*
@ -348,8 +476,7 @@ public class RemoteProcessGroupAuditor extends NiFiAuditor {
// ensure the user was found
if (user != null) {
// create the remote process group details
FlowChangeRemoteProcessGroupDetails remoteProcessGroupDetails = new FlowChangeRemoteProcessGroupDetails();
remoteProcessGroupDetails.setUri(remoteProcessGroup.getTargetUri().toString());
FlowChangeRemoteProcessGroupDetails remoteProcessGroupDetails = createFlowChangeDetails(remoteProcessGroup);
// create the remote process group action
action = new FlowChangeAction();

View File

@ -0,0 +1,556 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.audit;
import org.apache.nifi.action.Action;
import org.apache.nifi.action.Operation;
import org.apache.nifi.action.component.details.RemoteProcessGroupDetails;
import org.apache.nifi.action.details.ActionDetails;
import org.apache.nifi.action.details.ConfigureDetails;
import org.apache.nifi.admin.service.AuditService;
import org.apache.nifi.authorization.user.NiFiUser;
import org.apache.nifi.authorization.user.NiFiUserDetails;
import org.apache.nifi.authorization.user.StandardNiFiUser;
import org.apache.nifi.groups.RemoteProcessGroup;
import org.apache.nifi.remote.RemoteGroupPort;
import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol;
import org.apache.nifi.web.api.dto.RemoteProcessGroupDTO;
import org.apache.nifi.web.api.dto.RemoteProcessGroupPortDTO;
import org.apache.nifi.web.dao.RemoteProcessGroupDAO;
import org.aspectj.lang.ProceedingJoinPoint;
import org.junit.Before;
import org.junit.Test;
import org.springframework.security.core.Authentication;
import org.springframework.security.core.context.SecurityContext;
import org.springframework.security.core.context.SecurityContextHolder;
import java.util.Collection;
import java.util.concurrent.atomic.AtomicReference;
import static org.apache.nifi.web.api.dto.DtoFactory.SENSITIVE_VALUE_MASK;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class TestRemoteProcessGroupAuditor {
@Before
public void setup() {
final SecurityContext securityContext = SecurityContextHolder.getContext();
final Authentication authentication = mock(Authentication.class);
securityContext.setAuthentication(authentication);
final NiFiUser user = new StandardNiFiUser("user-id");
final NiFiUserDetails userDetail = new NiFiUserDetails(user);
when(authentication.getPrincipal()).thenReturn(userDetail);
}
@SuppressWarnings("unchecked")
private Collection<Action> updateProcessGroupConfiguration(RemoteProcessGroupDTO inputRPGDTO, RemoteProcessGroup existingRPG) throws Throwable {
final RemoteProcessGroupAuditor auditor = new RemoteProcessGroupAuditor();
final ProceedingJoinPoint joinPoint = mock(ProceedingJoinPoint.class);
final String remoteProcessGroupId = "remote-process-group-id";
inputRPGDTO.setId(remoteProcessGroupId);
final String targetUrl = "http://localhost:8080/nifi";
when(existingRPG.getTargetUri()).thenReturn(targetUrl);
final RemoteProcessGroupDAO remoteProcessGroupDAO = mock(RemoteProcessGroupDAO.class);
when(remoteProcessGroupDAO.getRemoteProcessGroup(remoteProcessGroupId))
.thenReturn(existingRPG);
// Setup updatedRPG mock based on inputRPGDTO.
final RemoteProcessGroup updatedRPG = mock(RemoteProcessGroup.class);
when(updatedRPG.getIdentifier()).thenReturn(remoteProcessGroupId);
when(updatedRPG.isTransmitting()).thenReturn(inputRPGDTO.isTransmitting());
when(updatedRPG.getCommunicationsTimeout()).thenReturn(inputRPGDTO.getCommunicationsTimeout());
when(updatedRPG.getYieldDuration()).thenReturn(inputRPGDTO.getYieldDuration());
when(updatedRPG.getTransportProtocol())
.thenReturn(SiteToSiteTransportProtocol.valueOf(inputRPGDTO.getTransportProtocol()));
when(updatedRPG.getProxyHost()).thenReturn(inputRPGDTO.getProxyHost());
when(updatedRPG.getProxyPort()).thenReturn(inputRPGDTO.getProxyPort());
when(updatedRPG.getProxyUser()).thenReturn(inputRPGDTO.getProxyUser());
when(updatedRPG.getProxyPassword()).thenReturn(inputRPGDTO.getProxyPassword());
when(joinPoint.proceed()).thenReturn(updatedRPG);
// Capture added actions so that those can be asserted later.
final AuditService auditService = mock(AuditService.class);
final AtomicReference<Collection<Action>> addedActions = new AtomicReference<>();
doAnswer(invocation -> {
Collection<Action> actions = invocation.getArgumentAt(0, Collection.class);
addedActions.set(actions);
return null;
}).when(auditService).addActions(any());
auditor.setAuditService(auditService);
auditor.auditUpdateProcessGroupConfiguration(joinPoint, inputRPGDTO, remoteProcessGroupDAO);
final Collection<Action> actions = addedActions.get();
// Assert common action values.
if (actions != null) {
actions.forEach(action -> {
assertEquals(remoteProcessGroupId, action.getSourceId());
assertEquals("user-id", action.getUserIdentity());
assertEquals(targetUrl, ((RemoteProcessGroupDetails)action.getComponentDetails()).getUri());
assertNotNull(action.getTimestamp());
});
}
return actions;
}
private RemoteProcessGroup defaultRemoteProcessGroup() {
final RemoteProcessGroup existingRPG = mock(RemoteProcessGroup.class);
when(existingRPG.getTransportProtocol()).thenReturn(SiteToSiteTransportProtocol.RAW);
when(existingRPG.isTransmitting()).thenReturn(false);
when(existingRPG.getProxyPort()).thenReturn(null);
return existingRPG;
}
private RemoteProcessGroupDTO defaultInput() {
final RemoteProcessGroupDTO inputRPGDTO = new RemoteProcessGroupDTO();
inputRPGDTO.setTransportProtocol("RAW");
inputRPGDTO.setTransmitting(false);
return inputRPGDTO;
}
@Test
public void testEnableTransmission() throws Throwable {
final RemoteProcessGroup existingRPG = defaultRemoteProcessGroup();
when(existingRPG.isTransmitting()).thenReturn(false);
final RemoteProcessGroupDTO inputRPGDTO = defaultInput();
inputRPGDTO.setTransmitting(true);
final Collection<Action> actions = updateProcessGroupConfiguration(inputRPGDTO, existingRPG);
assertEquals(1, actions.size());
final Action action = actions.iterator().next();
assertEquals(Operation.Start, action.getOperation());
assertNull(action.getActionDetails());
}
@Test
public void testDisableTransmission() throws Throwable {
final RemoteProcessGroup existingRPG = defaultRemoteProcessGroup();
when(existingRPG.isTransmitting()).thenReturn(true);
final RemoteProcessGroupDTO inputRPGDTO = defaultInput();
inputRPGDTO.setTransmitting(false);
final Collection<Action> actions = updateProcessGroupConfiguration(inputRPGDTO, existingRPG);
assertEquals(1, actions.size());
final Action action = actions.iterator().next();
assertEquals(Operation.Stop, action.getOperation());
assertNull(action.getActionDetails());
}
@Test
public void testConfigureCommunicationsTimeout() throws Throwable {
final RemoteProcessGroup existingRPG = defaultRemoteProcessGroup();
when(existingRPG.getCommunicationsTimeout()).thenReturn("30 sec");
final RemoteProcessGroupDTO inputRPGDTO = defaultInput();
inputRPGDTO.setCommunicationsTimeout("31 sec");
final Collection<Action> actions = updateProcessGroupConfiguration(inputRPGDTO, existingRPG);
assertEquals(1, actions.size());
final Action action = actions.iterator().next();
assertEquals(Operation.Configure, action.getOperation());
assertConfigureDetails(action.getActionDetails(), "Communications Timeout",
existingRPG.getCommunicationsTimeout(), inputRPGDTO.getCommunicationsTimeout());
}
private void assertConfigureDetails(final ActionDetails actionDetails, final String name,
final Object previousValue, final Object value) {
final ConfigureDetails configureDetails = (ConfigureDetails) actionDetails;
assertEquals(name, configureDetails.getName());
assertEquals(previousValue != null ? previousValue.toString() : "", configureDetails.getPreviousValue());
assertEquals(value != null ? value.toString() : "", configureDetails.getValue());
}
@Test
public void testConfigureYieldDuration() throws Throwable {
final RemoteProcessGroup existingRPG = defaultRemoteProcessGroup();
when(existingRPG.getYieldDuration()).thenReturn("10 sec");
final RemoteProcessGroupDTO inputRPGDTO = defaultInput();
inputRPGDTO.setYieldDuration("11 sec");
final Collection<Action> actions = updateProcessGroupConfiguration(inputRPGDTO, existingRPG);
assertEquals(1, actions.size());
final Action action = actions.iterator().next();
assertEquals(Operation.Configure, action.getOperation());
assertConfigureDetails(action.getActionDetails(), "Yield Duration",
existingRPG.getYieldDuration(), inputRPGDTO.getYieldDuration());
}
@Test
public void testConfigureTransportProtocol() throws Throwable {
final RemoteProcessGroup existingRPG = defaultRemoteProcessGroup();
when(existingRPG.getTransportProtocol()).thenReturn(SiteToSiteTransportProtocol.RAW);
final RemoteProcessGroupDTO inputRPGDTO = defaultInput();
inputRPGDTO.setTransportProtocol("HTTP");
final Collection<Action> actions = updateProcessGroupConfiguration(inputRPGDTO, existingRPG);
assertEquals(1, actions.size());
final Action action = actions.iterator().next();
assertEquals(Operation.Configure, action.getOperation());
assertConfigureDetails(action.getActionDetails(), "Transport Protocol",
existingRPG.getTransportProtocol().name(), inputRPGDTO.getTransportProtocol());
}
@Test
public void testConfigureProxyHost() throws Throwable {
final RemoteProcessGroup existingRPG = defaultRemoteProcessGroup();
final RemoteProcessGroupDTO inputRPGDTO = defaultInput();
inputRPGDTO.setProxyHost("proxy.example.com");
final Collection<Action> actions = updateProcessGroupConfiguration(inputRPGDTO, existingRPG);
assertEquals(1, actions.size());
final Action action = actions.iterator().next();
assertEquals(Operation.Configure, action.getOperation());
assertConfigureDetails(action.getActionDetails(), "Proxy Host",
existingRPG.getProxyHost(), inputRPGDTO.getProxyHost());
}
@Test
public void testConfigureProxyHostUpdate() throws Throwable {
final RemoteProcessGroup existingRPG = defaultRemoteProcessGroup();
when(existingRPG.getProxyHost()).thenReturn("proxy1.example.com");
final RemoteProcessGroupDTO inputRPGDTO = defaultInput();
inputRPGDTO.setProxyHost("proxy2.example.com");
final Collection<Action> actions = updateProcessGroupConfiguration(inputRPGDTO, existingRPG);
assertEquals(1, actions.size());
final Action action = actions.iterator().next();
assertEquals(Operation.Configure, action.getOperation());
assertConfigureDetails(action.getActionDetails(), "Proxy Host",
existingRPG.getProxyHost(), inputRPGDTO.getProxyHost());
}
@Test
public void testConfigureProxyHostClear() throws Throwable {
final RemoteProcessGroup existingRPG = defaultRemoteProcessGroup();
when(existingRPG.getProxyHost()).thenReturn("proxy.example.com");
final RemoteProcessGroupDTO inputRPGDTO = defaultInput();
inputRPGDTO.setProxyHost("");
final Collection<Action> actions = updateProcessGroupConfiguration(inputRPGDTO, existingRPG);
assertEquals(1, actions.size());
final Action action = actions.iterator().next();
assertEquals(Operation.Configure, action.getOperation());
assertConfigureDetails(action.getActionDetails(), "Proxy Host",
existingRPG.getProxyHost(), inputRPGDTO.getProxyHost());
}
@Test
public void testConfigureProxyPort() throws Throwable {
final RemoteProcessGroup existingRPG = defaultRemoteProcessGroup();
final RemoteProcessGroupDTO inputRPGDTO = defaultInput();
inputRPGDTO.setProxyPort(3128);
final Collection<Action> actions = updateProcessGroupConfiguration(inputRPGDTO, existingRPG);
assertEquals(1, actions.size());
final Action action = actions.iterator().next();
assertEquals(Operation.Configure, action.getOperation());
assertConfigureDetails(action.getActionDetails(), "Proxy Port",
existingRPG.getProxyPort(), inputRPGDTO.getProxyPort());
}
@Test
public void testConfigureProxyPortClear() throws Throwable {
final RemoteProcessGroup existingRPG = defaultRemoteProcessGroup();
when(existingRPG.getProxyPort()).thenReturn(3128);
final RemoteProcessGroupDTO inputRPGDTO = defaultInput();
inputRPGDTO.setProxyPort(null);
final Collection<Action> actions = updateProcessGroupConfiguration(inputRPGDTO, existingRPG);
assertEquals(1, actions.size());
final Action action = actions.iterator().next();
assertEquals(Operation.Configure, action.getOperation());
assertConfigureDetails(action.getActionDetails(), "Proxy Port",
existingRPG.getProxyPort(), inputRPGDTO.getProxyPort());
}
@Test
public void testConfigureProxyUser() throws Throwable {
final RemoteProcessGroup existingRPG = defaultRemoteProcessGroup();
final RemoteProcessGroupDTO inputRPGDTO = defaultInput();
inputRPGDTO.setProxyUser("proxy-user");
final Collection<Action> actions = updateProcessGroupConfiguration(inputRPGDTO, existingRPG);
assertEquals(1, actions.size());
final Action action = actions.iterator().next();
assertEquals(Operation.Configure, action.getOperation());
assertConfigureDetails(action.getActionDetails(), "Proxy User",
existingRPG.getProxyUser(), inputRPGDTO.getProxyUser());
}
@Test
public void testConfigureProxyUserClear() throws Throwable {
final RemoteProcessGroup existingRPG = defaultRemoteProcessGroup();
when(existingRPG.getProxyUser()).thenReturn("proxy-user");
final RemoteProcessGroupDTO inputRPGDTO = defaultInput();
inputRPGDTO.setProxyUser(null);
final Collection<Action> actions = updateProcessGroupConfiguration(inputRPGDTO, existingRPG);
assertEquals(1, actions.size());
final Action action = actions.iterator().next();
assertEquals(Operation.Configure, action.getOperation());
assertConfigureDetails(action.getActionDetails(), "Proxy User",
existingRPG.getProxyUser(), inputRPGDTO.getProxyUser());
}
@Test
public void testConfigureProxyPassword() throws Throwable {
final RemoteProcessGroup existingRPG = defaultRemoteProcessGroup();
final RemoteProcessGroupDTO inputRPGDTO = defaultInput();
inputRPGDTO.setProxyPassword("proxy-password");
final Collection<Action> actions = updateProcessGroupConfiguration(inputRPGDTO, existingRPG);
assertEquals(1, actions.size());
final Action action = actions.iterator().next();
assertEquals(Operation.Configure, action.getOperation());
assertConfigureDetails(action.getActionDetails(), "Proxy Password","", SENSITIVE_VALUE_MASK);
}
@Test
public void testConfigureProxyPasswordClear() throws Throwable {
final RemoteProcessGroup existingRPG = defaultRemoteProcessGroup();
when(existingRPG.getProxyPassword()).thenReturn("proxy-password");
final RemoteProcessGroupDTO inputRPGDTO = defaultInput();
inputRPGDTO.setProxyPassword(null);
final Collection<Action> actions = updateProcessGroupConfiguration(inputRPGDTO, existingRPG);
assertEquals(1, actions.size());
final Action action = actions.iterator().next();
assertEquals(Operation.Configure, action.getOperation());
assertConfigureDetails(action.getActionDetails(), "Proxy Password", SENSITIVE_VALUE_MASK, "");
}
@SuppressWarnings("unchecked")
private Collection<Action> updateProcessGroupInputPortConfiguration(RemoteProcessGroupPortDTO inputRPGPortDTO, RemoteGroupPort existingRPGPort) throws Throwable {
final RemoteProcessGroup existingRPG = defaultRemoteProcessGroup();
final RemoteProcessGroupAuditor auditor = new RemoteProcessGroupAuditor();
final ProceedingJoinPoint joinPoint = mock(ProceedingJoinPoint.class);
final String remoteProcessGroupId = "remote-process-group-id";
inputRPGPortDTO.setId(remoteProcessGroupId);
final String targetUrl = "http://localhost:8080/nifi";
when(existingRPG.getIdentifier()).thenReturn(remoteProcessGroupId);
when(existingRPG.getTargetUri()).thenReturn(targetUrl);
final RemoteProcessGroupDAO remoteProcessGroupDAO = mock(RemoteProcessGroupDAO.class);
when(remoteProcessGroupDAO.getRemoteProcessGroup(remoteProcessGroupId))
.thenReturn(existingRPG);
when(existingRPG.getInputPort(eq(inputRPGPortDTO.getId()))).thenReturn(existingRPGPort);
// Setup updatedRPGPort mock based on inputRPGPortDTO.
final RemoteGroupPort updatedRPGPort = mock(RemoteGroupPort.class);
final String portName = existingRPGPort.getName();
when(updatedRPGPort.getName()).thenReturn(portName);
if (inputRPGPortDTO.isTransmitting() != null) {
when(updatedRPGPort.isRunning()).thenReturn(inputRPGPortDTO.isTransmitting());
}
when(updatedRPGPort.getMaxConcurrentTasks()).thenReturn(inputRPGPortDTO.getConcurrentlySchedulableTaskCount());
when(updatedRPGPort.isUseCompression()).thenReturn(inputRPGPortDTO.getUseCompression());
when(joinPoint.proceed()).thenReturn(updatedRPGPort);
// Capture added actions so that those can be asserted later.
final AuditService auditService = mock(AuditService.class);
final AtomicReference<Collection<Action>> addedActions = new AtomicReference<>();
doAnswer(invocation -> {
Collection<Action> actions = invocation.getArgumentAt(0, Collection.class);
addedActions.set(actions);
return null;
}).when(auditService).addActions(any());
auditor.setAuditService(auditService);
auditor.auditUpdateProcessGroupInputPortConfiguration(joinPoint, remoteProcessGroupId, inputRPGPortDTO, remoteProcessGroupDAO);
final Collection<Action> actions = addedActions.get();
// Assert common action values.
if (actions != null) {
actions.forEach(action -> {
assertEquals(remoteProcessGroupId, action.getSourceId());
assertEquals("user-id", action.getUserIdentity());
assertEquals(targetUrl, ((RemoteProcessGroupDetails)action.getComponentDetails()).getUri());
assertNotNull(action.getTimestamp());
});
}
return actions;
}
private RemoteGroupPort defaultRemoteGroupPort() {
final RemoteGroupPort existingRPGPort = mock(RemoteGroupPort.class);
when(existingRPGPort.isRunning()).thenReturn(false);
when(existingRPGPort.getMaxConcurrentTasks()).thenReturn(1);
when(existingRPGPort.isUseCompression()).thenReturn(false);
return existingRPGPort;
}
private RemoteProcessGroupPortDTO defaultRemoteProcessGroupPortDTO() {
final RemoteProcessGroupPortDTO inputRPGPortDTO = new RemoteProcessGroupPortDTO();
inputRPGPortDTO.setConcurrentlySchedulableTaskCount(1);
inputRPGPortDTO.setUseCompression(false);
return inputRPGPortDTO;
}
@Test
public void testEnablePort() throws Throwable {
final RemoteGroupPort existingRPGPort = defaultRemoteGroupPort();
when(existingRPGPort.getName()).thenReturn("input-port-1");
when(existingRPGPort.isRunning()).thenReturn(false);
final RemoteProcessGroupPortDTO inputRPGPortDTO = defaultRemoteProcessGroupPortDTO();
inputRPGPortDTO.setTransmitting(true);
final Collection<Action> actions = updateProcessGroupInputPortConfiguration(inputRPGPortDTO, existingRPGPort);
assertEquals(1, actions.size());
final Action action = actions.iterator().next();
assertEquals(Operation.Configure, action.getOperation());
assertConfigureDetails(action.getActionDetails(), "input-port-1.Transmission", "disabled", "enabled");
}
@Test
public void testDisablePort() throws Throwable {
final RemoteGroupPort existingRPGPort = defaultRemoteGroupPort();
when(existingRPGPort.getName()).thenReturn("input-port-1");
when(existingRPGPort.isRunning()).thenReturn(true);
final RemoteProcessGroupPortDTO inputRPGPortDTO = defaultRemoteProcessGroupPortDTO();
inputRPGPortDTO.setTransmitting(false);
final Collection<Action> actions = updateProcessGroupInputPortConfiguration(inputRPGPortDTO, existingRPGPort);
assertEquals(1, actions.size());
final Action action = actions.iterator().next();
assertEquals(Operation.Configure, action.getOperation());
assertConfigureDetails(action.getActionDetails(), "input-port-1.Transmission", "enabled", "disabled");
}
@Test
public void testConfigurePortConcurrency() throws Throwable {
final RemoteGroupPort existingRPGPort = defaultRemoteGroupPort();
when(existingRPGPort.getName()).thenReturn("input-port-1");
final RemoteProcessGroupPortDTO inputRPGPortDTO = defaultRemoteProcessGroupPortDTO();
inputRPGPortDTO.setConcurrentlySchedulableTaskCount(2);
final Collection<Action> actions = updateProcessGroupInputPortConfiguration(inputRPGPortDTO, existingRPGPort);
assertEquals(1, actions.size());
final Action action = actions.iterator().next();
assertEquals(Operation.Configure, action.getOperation());
assertConfigureDetails(action.getActionDetails(), "input-port-1.Concurrent Tasks", "1", "2");
}
@Test
public void testConfigurePortCompression() throws Throwable {
final RemoteGroupPort existingRPGPort = defaultRemoteGroupPort();
when(existingRPGPort.getName()).thenReturn("input-port-1");
final RemoteProcessGroupPortDTO inputRPGPortDTO = defaultRemoteProcessGroupPortDTO();
inputRPGPortDTO.setUseCompression(true);
final Collection<Action> actions = updateProcessGroupInputPortConfiguration(inputRPGPortDTO, existingRPGPort);
assertEquals(1, actions.size());
final Action action = actions.iterator().next();
assertEquals(Operation.Configure, action.getOperation());
assertConfigureDetails(action.getActionDetails(), "input-port-1.Compressed", "false", "true");
}
}