From 5af6eb17b0c1be9aca201507769f6b530d5ba0f6 Mon Sep 17 00:00:00 2001 From: Koji Kawamura Date: Tue, 7 Feb 2017 10:11:30 +0900 Subject: [PATCH] NIFI-3162: Audit RPG and RPG port config changes. - Added configure audits for Transport Protocol, HTTP Proxy Server Host, Port, User and Password in RemoteProcessGroup configuration - Added configure audits for enabling/disabling individual remote port - Added configure audits for Concurrent Tasks and Compressed in Remote Port configuration - This closes #1476 --- .../nifi/audit/RemoteProcessGroupAuditor.java | 421 ++++++++----- .../audit/TestRemoteProcessGroupAuditor.java | 556 ++++++++++++++++++ 2 files changed, 830 insertions(+), 147 deletions(-) create mode 100644 nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/audit/TestRemoteProcessGroupAuditor.java diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/audit/RemoteProcessGroupAuditor.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/audit/RemoteProcessGroupAuditor.java index e119437ce4..65995f5ddf 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/audit/RemoteProcessGroupAuditor.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/audit/RemoteProcessGroupAuditor.java @@ -22,12 +22,13 @@ import org.apache.nifi.action.FlowChangeAction; import org.apache.nifi.action.Operation; import org.apache.nifi.action.component.details.FlowChangeRemoteProcessGroupDetails; import org.apache.nifi.action.details.ActionDetails; +import org.apache.nifi.action.details.ConfigureDetails; import org.apache.nifi.action.details.FlowChangeConfigureDetails; import org.apache.nifi.authorization.user.NiFiUserUtils; import org.apache.nifi.groups.RemoteProcessGroup; import org.apache.nifi.remote.RemoteGroupPort; import org.apache.nifi.authorization.user.NiFiUser; -import org.apache.nifi.web.api.dto.RemoteProcessGroupContentsDTO; +import org.apache.nifi.util.StringUtils; import org.apache.nifi.web.api.dto.RemoteProcessGroupDTO; import org.apache.nifi.web.api.dto.RemoteProcessGroupPortDTO; import org.apache.nifi.web.dao.RemoteProcessGroupDAO; @@ -38,10 +39,17 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.Date; import java.util.HashMap; +import java.util.List; import java.util.Map; +import java.util.Objects; +import java.util.function.BiFunction; +import java.util.function.Function; + +import static org.apache.nifi.web.api.dto.DtoFactory.SENSITIVE_VALUE_MASK; /** * Audits remote process group creation/removal and configuration changes. @@ -51,6 +59,44 @@ public class RemoteProcessGroupAuditor extends NiFiAuditor { private static final Logger logger = LoggerFactory.getLogger(RemoteProcessGroupAuditor.class); + // Proxy settings should be able to be null cleared, so the necessity of checking those properties depend on + // whether transport protocol is specified. + // See StandardRemoteProcessGroupDAO.updateRemoteProcessGroup for detail. + private static final Function IS_TRANSPORT_PROTOCOL_SET = dto -> dto.getTransportProtocol() != null; + + private static final List> CONFIG_RECORDERS = Arrays.asList( + new ConfigurationRecorder("Communications Timeout", + dto -> dto.getCommunicationsTimeout() != null, RemoteProcessGroup::getCommunicationsTimeout), + new ConfigurationRecorder("Yield Duration", + dto -> dto.getYieldDuration() != null, RemoteProcessGroup::getYieldDuration), + new ConfigurationRecorder("Transport Protocol", + IS_TRANSPORT_PROTOCOL_SET, rpg -> rpg.getTransportProtocol().name()), + new ConfigurationRecorder<>("Proxy Host", + IS_TRANSPORT_PROTOCOL_SET, RemoteProcessGroup::getProxyHost), + new ConfigurationRecorder<>("Proxy Port", + IS_TRANSPORT_PROTOCOL_SET, RemoteProcessGroup::getProxyPort), + new ConfigurationRecorder<>("Proxy User", + IS_TRANSPORT_PROTOCOL_SET, RemoteProcessGroup::getProxyUser), + new ConfigurationRecorder<>("Proxy Password", + IS_TRANSPORT_PROTOCOL_SET, RemoteProcessGroup::getProxyPassword) + .setConvertRawValue(v -> StringUtils.isEmpty(v) ? "" : SENSITIVE_VALUE_MASK) + ); + + + private static final BiFunction PORT_NAME_CONVERT = (updated, name) -> updated.getName() + "." + name; + private static final List> PORT_CONFIG_RECORDERS = Arrays.asList( + new ConfigurationRecorder("Transmission", + dto -> dto.isTransmitting() != null, RemoteGroupPort::isRunning) + .setConvertName(PORT_NAME_CONVERT) + .setConvertRawValue(v -> Boolean.valueOf(v) ? "enabled" : "disabled"), + new ConfigurationRecorder("Concurrent Tasks", + dto -> dto.getConcurrentlySchedulableTaskCount() != null, RemoteGroupPort::getMaxConcurrentTasks) + .setConvertName(PORT_NAME_CONVERT), + new ConfigurationRecorder("Compressed", + dto -> dto.getUseCompression() != null, RemoteGroupPort::isUseCompression) + .setConvertName(PORT_NAME_CONVERT) + ); + /** * Audits the creation of remote process groups via createRemoteProcessGroup(). * @@ -79,10 +125,113 @@ public class RemoteProcessGroupAuditor extends NiFiAuditor { return remoteProcessGroup; } + /** + * Provides a higher order functions to compute Configuration audit events. + * @param Class of a component, such as RemoteProcessGroup or RemoteGroupPort + * @param Class of a DTO, such as RemoteProcessGroupDTO or RemoteProcessGroupPortDTO + */ + private static class ConfigurationRecorder { + final String name; + final Function hasInput; + final Function getValue; + private Function convertRawValue; + private BiFunction convertName; + + /** + * Create a recorder for a configuration property. + * @param name name of the target property + * @param hasInput a function that returns whether the property is being updated by a request + * @param getValue a function that returns value of the property + */ + private ConfigurationRecorder(String name, Function hasInput, Function getValue) { + this.name = name; + this.hasInput = hasInput; + this.getValue = getValue; + } + + private String convertRawValue(final String value) { + return convertRawValue != null ? convertRawValue.apply(value) : value; + } + + /** + * If a property value needs to be converted for audit record, e.g. sensitive values, + * use this method to specify a function to convert raw value. + * @param convertRawValue a function to convert string representation of a property value + * @return converted value + */ + private ConfigurationRecorder setConvertRawValue(final Function convertRawValue) { + this.convertRawValue = convertRawValue; + return this; + } + + /** + * If a property name needs to be decorated depends on other values in a context, + * use this method to specify a function to convert name. + * @param convertName a function to convert name of a property + * @return converted name + */ + private ConfigurationRecorder setConvertName(final BiFunction convertName) { + this.convertName = convertName; + return this; + } + + private ConfigureDetails checkConfigured(final DTO input, + final CMP updated, + final Object previousValue) { + + final Object updatedValue = getValue.apply(updated); + // Convert null to empty String to avoid NullPointerException. + final String updatedStr = updatedValue != null ? updatedValue.toString() : ""; + final String previousStr = previousValue != null ? previousValue.toString() : ""; + if (hasInput.apply(input) && !updatedStr.equals(previousStr)) { + + FlowChangeConfigureDetails configDetails = new FlowChangeConfigureDetails(); + configDetails.setName(convertName != null ? convertName.apply(updated, name) : name); + configDetails.setPreviousValue(convertRawValue(previousStr)); + configDetails.setValue(convertRawValue(updatedStr)); + return configDetails; + } + return null; + } + + /** + * Capture values before a component to be updated. This method should be called before proceeding a joint point. + * @param recorders list of ConfigurationRecorder + * @param component a target component to capture + * @param Class of the target component + * @param DTO class of the target component + * @return captured values keyed with its name + */ + private static Map capturePreviousValues(final List> recorders, final C component) { + final Map previousValues = new HashMap<>(recorders.size()); + recorders.forEach(r -> previousValues.put(r.name, r.getValue.apply(component))); + return previousValues; + } + + /** + * Generate ActionDetails for properties those have been updated. + * This method should be called after proceeding a joint point with an updated component. + * @param recorders list of ConfigurationRecorder + * @param dto DTO instance containing requested values + * @param updatedComponent a component instance that is updated by corresponding DAO + * @param previousValues previous property values before being updated + * @param details a Collection to accumulate action details generated + * @param Class of the target component + * @param DTO class of the target component + */ + private static void checkConfigured(final List> recorders, final D dto, final C updatedComponent, + final Map previousValues, final Collection details) { + recorders.stream() + .map(r -> r.checkConfigured(dto, updatedComponent, previousValues.get(r.name))) + .filter(Objects::nonNull).forEach(d -> details.add(d)); + + } + } + /** * Audits the update of remote process group configuration. * - * @param proceedingJoinPoint join point + * @param proceedingJoinPoint join point * @param remoteProcessGroupDTO dto * @param remoteProcessGroupDAO dao * @return group @@ -98,18 +247,7 @@ public class RemoteProcessGroupAuditor extends NiFiAuditor { // record the current value of this remoteProcessGroups configuration for comparisons later final boolean transmissionState = remoteProcessGroup.isTransmitting(); - final String communicationsTimeout = remoteProcessGroup.getCommunicationsTimeout(); - final String yieldDuration = remoteProcessGroup.getYieldDuration(); - final Map concurrentTasks = new HashMap<>(); - final Map compression = new HashMap<>(); - for (final RemoteGroupPort remotePort : remoteProcessGroup.getInputPorts()) { - concurrentTasks.put(remotePort.getIdentifier(), remotePort.getMaxConcurrentTasks()); - compression.put(remotePort.getIdentifier(), remotePort.isUseCompression()); - } - for (final RemoteGroupPort remotePort : remoteProcessGroup.getOutputPorts()) { - concurrentTasks.put(remotePort.getIdentifier(), remotePort.getMaxConcurrentTasks()); - compression.put(remotePort.getIdentifier(), remotePort.isUseCompression()); - } + final Map previousValues = ConfigurationRecorder.capturePreviousValues(CONFIG_RECORDERS, remoteProcessGroup); // perform the underlying operation final RemoteProcessGroup updatedRemoteProcessGroup = (RemoteProcessGroup) proceedingJoinPoint.proceed(); @@ -119,140 +257,26 @@ public class RemoteProcessGroupAuditor extends NiFiAuditor { // ensure the user was found if (user != null) { - Collection details = new ArrayList<>(); + final Collection details = new ArrayList<>(); - // see if the communications timeout has changed - if (remoteProcessGroupDTO.getCommunicationsTimeout() != null && !updatedRemoteProcessGroup.getCommunicationsTimeout().equals(communicationsTimeout)) { - // create the config details - FlowChangeConfigureDetails configDetails = new FlowChangeConfigureDetails(); - configDetails.setName("Communications Timeout"); - configDetails.setValue(updatedRemoteProcessGroup.getCommunicationsTimeout()); - configDetails.setPreviousValue(communicationsTimeout); + // see if any property has changed + ConfigurationRecorder.checkConfigured(CONFIG_RECORDERS, remoteProcessGroupDTO, updatedRemoteProcessGroup, previousValues, details); - details.add(configDetails); - } - - // see if the yield duration has changed - if (remoteProcessGroupDTO.getYieldDuration() != null && !updatedRemoteProcessGroup.getYieldDuration().equals(yieldDuration)) { - // create the config details - FlowChangeConfigureDetails configDetails = new FlowChangeConfigureDetails(); - configDetails.setName("Yield Duration"); - configDetails.setValue(updatedRemoteProcessGroup.getYieldDuration()); - configDetails.setPreviousValue(yieldDuration); - - details.add(configDetails); - } - - // see if the contents of this remote process group are possibly changing - if (remoteProcessGroupDTO.getContents() != null) { - final RemoteProcessGroupContentsDTO contents = remoteProcessGroupDTO.getContents(); - - // see if any input port configuration is changing - if (contents.getInputPorts() != null) { - for (final RemoteProcessGroupPortDTO remotePortDTO : contents.getInputPorts()) { - final RemoteGroupPort remotePort = updatedRemoteProcessGroup.getInputPort(remotePortDTO.getId()); - - // if this port has been removed, ignore the configuration change for auditing purposes - if (remotePort == null) { - continue; - } - - // if a new concurrent task count is specified - if (remotePortDTO.getConcurrentlySchedulableTaskCount() != null) { - // see if the concurrent tasks has changed - final Integer previousConcurrentTasks = concurrentTasks.get(remotePortDTO.getId()); - if (previousConcurrentTasks != null && remotePort.getMaxConcurrentTasks() != previousConcurrentTasks) { - // create the config details - FlowChangeConfigureDetails concurrentTasksDetails = new FlowChangeConfigureDetails(); - concurrentTasksDetails.setName("Concurrent Tasks"); - concurrentTasksDetails.setValue(String.valueOf(remotePort.getMaxConcurrentTasks())); - concurrentTasksDetails.setPreviousValue(String.valueOf(previousConcurrentTasks)); - - details.add(concurrentTasksDetails); - } - } - - // if a new compressed flag is specified - if (remotePortDTO.getUseCompression() != null) { - // see if the compression has changed - final Boolean previousCompression = compression.get(remotePortDTO.getId()); - if (previousCompression != null && remotePort.isUseCompression() != previousCompression) { - // create the config details - FlowChangeConfigureDetails compressionDetails = new FlowChangeConfigureDetails(); - compressionDetails.setName("Compressed"); - compressionDetails.setValue(String.valueOf(remotePort.isUseCompression())); - compressionDetails.setPreviousValue(String.valueOf(previousCompression)); - - details.add(compressionDetails); - } - } - } - } - - // see if any output port configuration is changing - if (contents.getOutputPorts() != null) { - for (final RemoteProcessGroupPortDTO remotePortDTO : contents.getOutputPorts()) { - final RemoteGroupPort remotePort = updatedRemoteProcessGroup.getOutputPort(remotePortDTO.getId()); - - // if this port has been removed, ignore the configuration change for auditing purposes - if (remotePort == null) { - continue; - } - - // if a new concurrent task count is specified - if (remotePortDTO.getConcurrentlySchedulableTaskCount() != null) { - // see if the concurrent tasks has changed - final Integer previousConcurrentTasks = concurrentTasks.get(remotePortDTO.getId()); - if (previousConcurrentTasks != null && remotePort.getMaxConcurrentTasks() != previousConcurrentTasks) { - // create the config details - FlowChangeConfigureDetails concurrentTasksDetails = new FlowChangeConfigureDetails(); - concurrentTasksDetails.setName("Concurrent Tasks"); - concurrentTasksDetails.setValue(String.valueOf(remotePort.getMaxConcurrentTasks())); - concurrentTasksDetails.setPreviousValue(String.valueOf(previousConcurrentTasks)); - - details.add(concurrentTasksDetails); - } - } - - // if a new compressed flag is specified - if (remotePortDTO.getUseCompression() != null) { - // see if the compression has changed - final Boolean previousCompression = compression.get(remotePortDTO.getId()); - if (previousCompression != null && remotePort.isUseCompression() != previousCompression) { - // create the config details - FlowChangeConfigureDetails compressionDetails = new FlowChangeConfigureDetails(); - compressionDetails.setName("Compressed"); - compressionDetails.setValue(String.valueOf(remotePort.isUseCompression())); - compressionDetails.setPreviousValue(String.valueOf(previousCompression)); - - details.add(compressionDetails); - } - } - } - } - } - - Collection actions = new ArrayList<>(); + final Date timestamp = new Date(); + final Collection actions = new ArrayList<>(); // create the remote process group details - FlowChangeRemoteProcessGroupDetails remoteProcessGroupDetails = new FlowChangeRemoteProcessGroupDetails(); - remoteProcessGroupDetails.setUri(remoteProcessGroup.getTargetUri()); + final FlowChangeRemoteProcessGroupDetails remoteProcessGroupDetails = createFlowChangeDetails(remoteProcessGroup); // save the actions if necessary if (!details.isEmpty()) { - Date timestamp = new Date(); // create the actions for (ActionDetails detail : details) { - // create the port action for updating the name - FlowChangeAction remoteProcessGroupAction = new FlowChangeAction(); - remoteProcessGroupAction.setUserIdentity(user.getIdentity()); + // create a configure action for each updated property + FlowChangeAction remoteProcessGroupAction = createFlowChangeAction(user, timestamp, + updatedRemoteProcessGroup, remoteProcessGroupDetails); remoteProcessGroupAction.setOperation(Operation.Configure); - remoteProcessGroupAction.setTimestamp(timestamp); - remoteProcessGroupAction.setSourceId(updatedRemoteProcessGroup.getIdentifier()); - remoteProcessGroupAction.setSourceName(updatedRemoteProcessGroup.getName()); - remoteProcessGroupAction.setSourceType(Component.RemoteProcessGroup); - remoteProcessGroupAction.setComponentDetails(remoteProcessGroupDetails); remoteProcessGroupAction.setActionDetails(detail); actions.add(remoteProcessGroupAction); @@ -264,14 +288,9 @@ public class RemoteProcessGroupAuditor extends NiFiAuditor { // determine if the running state has changed if (transmissionState != updatedTransmissionState) { - // create a processor action - FlowChangeAction remoteProcessGroupAction = new FlowChangeAction(); - remoteProcessGroupAction.setUserIdentity(user.getIdentity()); - remoteProcessGroupAction.setTimestamp(new Date()); - remoteProcessGroupAction.setSourceId(updatedRemoteProcessGroup.getIdentifier()); - remoteProcessGroupAction.setSourceName(updatedRemoteProcessGroup.getName()); - remoteProcessGroupAction.setSourceType(Component.RemoteProcessGroup); - remoteProcessGroupAction.setComponentDetails(remoteProcessGroupDetails); + // create a remote process group action + FlowChangeAction remoteProcessGroupAction = createFlowChangeAction(user, timestamp, + updatedRemoteProcessGroup, remoteProcessGroupDetails); // set the operation accordingly if (updatedTransmissionState) { @@ -292,6 +311,20 @@ public class RemoteProcessGroupAuditor extends NiFiAuditor { return updatedRemoteProcessGroup; } + private FlowChangeAction createFlowChangeAction(final NiFiUser user, final Date timestamp, + final RemoteProcessGroup remoteProcessGroup, + final FlowChangeRemoteProcessGroupDetails remoteProcessGroupDetails) { + + FlowChangeAction remoteProcessGroupAction = new FlowChangeAction(); + remoteProcessGroupAction.setUserIdentity(user.getIdentity()); + remoteProcessGroupAction.setTimestamp(timestamp); + remoteProcessGroupAction.setSourceId(remoteProcessGroup.getIdentifier()); + remoteProcessGroupAction.setSourceName(remoteProcessGroup.getName()); + remoteProcessGroupAction.setSourceType(Component.RemoteProcessGroup); + remoteProcessGroupAction.setComponentDetails(remoteProcessGroupDetails); + return remoteProcessGroupAction; + } + /** * Audits the removal of a process group via deleteProcessGroup(). * @@ -320,6 +353,101 @@ public class RemoteProcessGroupAuditor extends NiFiAuditor { } } + private RemoteGroupPort auditUpdateProcessGroupPortConfiguration(ProceedingJoinPoint proceedingJoinPoint, RemoteProcessGroupPortDTO remoteProcessGroupPortDto, RemoteProcessGroup remoteProcessGroup, RemoteGroupPort remoteProcessGroupPort) throws Throwable { + final Map previousValues = ConfigurationRecorder.capturePreviousValues(PORT_CONFIG_RECORDERS, remoteProcessGroupPort); + + // perform the underlying operation + final RemoteGroupPort updatedRemoteProcessGroupPort = (RemoteGroupPort) proceedingJoinPoint.proceed(); + + // get the current user + NiFiUser user = NiFiUserUtils.getNiFiUser(); + + if (user != null) { + final Collection details = new ArrayList<>(); + + // see if any property has changed + ConfigurationRecorder.checkConfigured(PORT_CONFIG_RECORDERS, remoteProcessGroupPortDto, updatedRemoteProcessGroupPort, previousValues, details); + + final Date timestamp = new Date(); + final Collection actions = new ArrayList<>(); + + // create the remote process group details + final FlowChangeRemoteProcessGroupDetails remoteProcessGroupDetails = createFlowChangeDetails(remoteProcessGroup); + + // save the actions if necessary + for (ActionDetails detail : details) { + // create a configure action for each updated property + FlowChangeAction remoteProcessGroupAction = createFlowChangeAction(user, timestamp, + remoteProcessGroup, remoteProcessGroupDetails); + remoteProcessGroupAction.setOperation(Operation.Configure); + remoteProcessGroupAction.setActionDetails(detail); + + actions.add(remoteProcessGroupAction); + } + + // ensure there are actions to record + if (!actions.isEmpty()) { + // save the actions + saveActions(actions, logger); + } + } + + return updatedRemoteProcessGroupPort; + } + + /** + * Audits the update of remote process group input port configuration. + * + * @param proceedingJoinPoint join point + * @param remoteProcessGroupPortDto dto + * @param remoteProcessGroupDAO dao + * @return group + * @throws Throwable ex + */ + @Around("within(org.apache.nifi.web.dao.RemoteProcessGroupDAO+) && " + + "execution(org.apache.nifi.remote.RemoteGroupPort updateRemoteProcessGroupInputPort(java.lang.String, org.apache.nifi.web.api.dto.RemoteProcessGroupPortDTO)) && " + + "args(remoteProcessGroupId, remoteProcessGroupPortDto) && " + + "target(remoteProcessGroupDAO)") + public RemoteGroupPort auditUpdateProcessGroupInputPortConfiguration( + ProceedingJoinPoint proceedingJoinPoint, String remoteProcessGroupId, + RemoteProcessGroupPortDTO remoteProcessGroupPortDto, RemoteProcessGroupDAO remoteProcessGroupDAO) throws Throwable { + + final RemoteProcessGroup remoteProcessGroup = remoteProcessGroupDAO.getRemoteProcessGroup(remoteProcessGroupId); + final RemoteGroupPort remoteProcessGroupPort = remoteProcessGroup.getInputPort(remoteProcessGroupPortDto.getId()); + + return auditUpdateProcessGroupPortConfiguration(proceedingJoinPoint, remoteProcessGroupPortDto, remoteProcessGroup, remoteProcessGroupPort); + } + + /** + * Audits the update of remote process group output port configuration. + * + * @param proceedingJoinPoint join point + * @param remoteProcessGroupPortDto dto + * @param remoteProcessGroupDAO dao + * @return group + * @throws Throwable ex + */ + @Around("within(org.apache.nifi.web.dao.RemoteProcessGroupDAO+) && " + + "execution(org.apache.nifi.remote.RemoteGroupPort updateRemoteProcessGroupOutputPort(java.lang.String, org.apache.nifi.web.api.dto.RemoteProcessGroupPortDTO)) && " + + "args(remoteProcessGroupId, remoteProcessGroupPortDto) && " + + "target(remoteProcessGroupDAO)") + public RemoteGroupPort auditUpdateProcessGroupOutputPortConfiguration( + ProceedingJoinPoint proceedingJoinPoint, String remoteProcessGroupId, + RemoteProcessGroupPortDTO remoteProcessGroupPortDto, RemoteProcessGroupDAO remoteProcessGroupDAO) throws Throwable { + + final RemoteProcessGroup remoteProcessGroup = remoteProcessGroupDAO.getRemoteProcessGroup(remoteProcessGroupId); + final RemoteGroupPort remoteProcessGroupPort = remoteProcessGroup.getOutputPort(remoteProcessGroupPortDto.getId()); + + return auditUpdateProcessGroupPortConfiguration(proceedingJoinPoint, remoteProcessGroupPortDto, remoteProcessGroup, remoteProcessGroupPort); + } + + private FlowChangeRemoteProcessGroupDetails createFlowChangeDetails(RemoteProcessGroup remoteProcessGroup) { + FlowChangeRemoteProcessGroupDetails remoteProcessGroupDetails = new FlowChangeRemoteProcessGroupDetails(); + remoteProcessGroupDetails.setUri(remoteProcessGroup.getTargetUri()); + return remoteProcessGroupDetails; + } + + /** * Generates an audit record for the specified remote process group. * @@ -348,8 +476,7 @@ public class RemoteProcessGroupAuditor extends NiFiAuditor { // ensure the user was found if (user != null) { // create the remote process group details - FlowChangeRemoteProcessGroupDetails remoteProcessGroupDetails = new FlowChangeRemoteProcessGroupDetails(); - remoteProcessGroupDetails.setUri(remoteProcessGroup.getTargetUri().toString()); + FlowChangeRemoteProcessGroupDetails remoteProcessGroupDetails = createFlowChangeDetails(remoteProcessGroup); // create the remote process group action action = new FlowChangeAction(); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/audit/TestRemoteProcessGroupAuditor.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/audit/TestRemoteProcessGroupAuditor.java new file mode 100644 index 0000000000..725e4d4fc4 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/audit/TestRemoteProcessGroupAuditor.java @@ -0,0 +1,556 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.audit; + +import org.apache.nifi.action.Action; +import org.apache.nifi.action.Operation; +import org.apache.nifi.action.component.details.RemoteProcessGroupDetails; +import org.apache.nifi.action.details.ActionDetails; +import org.apache.nifi.action.details.ConfigureDetails; +import org.apache.nifi.admin.service.AuditService; +import org.apache.nifi.authorization.user.NiFiUser; +import org.apache.nifi.authorization.user.NiFiUserDetails; +import org.apache.nifi.authorization.user.StandardNiFiUser; +import org.apache.nifi.groups.RemoteProcessGroup; +import org.apache.nifi.remote.RemoteGroupPort; +import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol; +import org.apache.nifi.web.api.dto.RemoteProcessGroupDTO; +import org.apache.nifi.web.api.dto.RemoteProcessGroupPortDTO; +import org.apache.nifi.web.dao.RemoteProcessGroupDAO; +import org.aspectj.lang.ProceedingJoinPoint; +import org.junit.Before; +import org.junit.Test; +import org.springframework.security.core.Authentication; +import org.springframework.security.core.context.SecurityContext; +import org.springframework.security.core.context.SecurityContextHolder; + +import java.util.Collection; +import java.util.concurrent.atomic.AtomicReference; + +import static org.apache.nifi.web.api.dto.DtoFactory.SENSITIVE_VALUE_MASK; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class TestRemoteProcessGroupAuditor { + + @Before + public void setup() { + + final SecurityContext securityContext = SecurityContextHolder.getContext(); + final Authentication authentication = mock(Authentication.class); + securityContext.setAuthentication(authentication); + final NiFiUser user = new StandardNiFiUser("user-id"); + final NiFiUserDetails userDetail = new NiFiUserDetails(user); + when(authentication.getPrincipal()).thenReturn(userDetail); + + } + + @SuppressWarnings("unchecked") + private Collection updateProcessGroupConfiguration(RemoteProcessGroupDTO inputRPGDTO, RemoteProcessGroup existingRPG) throws Throwable { + final RemoteProcessGroupAuditor auditor = new RemoteProcessGroupAuditor(); + final ProceedingJoinPoint joinPoint = mock(ProceedingJoinPoint.class); + final String remoteProcessGroupId = "remote-process-group-id"; + inputRPGDTO.setId(remoteProcessGroupId); + + final String targetUrl = "http://localhost:8080/nifi"; + when(existingRPG.getTargetUri()).thenReturn(targetUrl); + + final RemoteProcessGroupDAO remoteProcessGroupDAO = mock(RemoteProcessGroupDAO.class); + when(remoteProcessGroupDAO.getRemoteProcessGroup(remoteProcessGroupId)) + .thenReturn(existingRPG); + + // Setup updatedRPG mock based on inputRPGDTO. + final RemoteProcessGroup updatedRPG = mock(RemoteProcessGroup.class); + when(updatedRPG.getIdentifier()).thenReturn(remoteProcessGroupId); + when(updatedRPG.isTransmitting()).thenReturn(inputRPGDTO.isTransmitting()); + when(updatedRPG.getCommunicationsTimeout()).thenReturn(inputRPGDTO.getCommunicationsTimeout()); + when(updatedRPG.getYieldDuration()).thenReturn(inputRPGDTO.getYieldDuration()); + when(updatedRPG.getTransportProtocol()) + .thenReturn(SiteToSiteTransportProtocol.valueOf(inputRPGDTO.getTransportProtocol())); + when(updatedRPG.getProxyHost()).thenReturn(inputRPGDTO.getProxyHost()); + when(updatedRPG.getProxyPort()).thenReturn(inputRPGDTO.getProxyPort()); + when(updatedRPG.getProxyUser()).thenReturn(inputRPGDTO.getProxyUser()); + when(updatedRPG.getProxyPassword()).thenReturn(inputRPGDTO.getProxyPassword()); + + when(joinPoint.proceed()).thenReturn(updatedRPG); + + // Capture added actions so that those can be asserted later. + final AuditService auditService = mock(AuditService.class); + final AtomicReference> addedActions = new AtomicReference<>(); + doAnswer(invocation -> { + Collection actions = invocation.getArgumentAt(0, Collection.class); + addedActions.set(actions); + return null; + }).when(auditService).addActions(any()); + + auditor.setAuditService(auditService); + + auditor.auditUpdateProcessGroupConfiguration(joinPoint, inputRPGDTO, remoteProcessGroupDAO); + + + final Collection actions = addedActions.get(); + + // Assert common action values. + if (actions != null) { + actions.forEach(action -> { + assertEquals(remoteProcessGroupId, action.getSourceId()); + assertEquals("user-id", action.getUserIdentity()); + assertEquals(targetUrl, ((RemoteProcessGroupDetails)action.getComponentDetails()).getUri()); + assertNotNull(action.getTimestamp()); + }); + } + + return actions; + } + + private RemoteProcessGroup defaultRemoteProcessGroup() { + final RemoteProcessGroup existingRPG = mock(RemoteProcessGroup.class); + when(existingRPG.getTransportProtocol()).thenReturn(SiteToSiteTransportProtocol.RAW); + when(existingRPG.isTransmitting()).thenReturn(false); + when(existingRPG.getProxyPort()).thenReturn(null); + return existingRPG; + } + + private RemoteProcessGroupDTO defaultInput() { + final RemoteProcessGroupDTO inputRPGDTO = new RemoteProcessGroupDTO(); + inputRPGDTO.setTransportProtocol("RAW"); + inputRPGDTO.setTransmitting(false); + return inputRPGDTO; + } + + @Test + public void testEnableTransmission() throws Throwable { + + final RemoteProcessGroup existingRPG = defaultRemoteProcessGroup(); + when(existingRPG.isTransmitting()).thenReturn(false); + + final RemoteProcessGroupDTO inputRPGDTO = defaultInput(); + inputRPGDTO.setTransmitting(true); + + final Collection actions = updateProcessGroupConfiguration(inputRPGDTO, existingRPG); + + assertEquals(1, actions.size()); + final Action action = actions.iterator().next(); + assertEquals(Operation.Start, action.getOperation()); + assertNull(action.getActionDetails()); + + } + + @Test + public void testDisableTransmission() throws Throwable { + + final RemoteProcessGroup existingRPG = defaultRemoteProcessGroup(); + when(existingRPG.isTransmitting()).thenReturn(true); + + final RemoteProcessGroupDTO inputRPGDTO = defaultInput(); + inputRPGDTO.setTransmitting(false); + + final Collection actions = updateProcessGroupConfiguration(inputRPGDTO, existingRPG); + + assertEquals(1, actions.size()); + final Action action = actions.iterator().next(); + assertEquals(Operation.Stop, action.getOperation()); + assertNull(action.getActionDetails()); + + } + + @Test + public void testConfigureCommunicationsTimeout() throws Throwable { + + final RemoteProcessGroup existingRPG = defaultRemoteProcessGroup(); + when(existingRPG.getCommunicationsTimeout()).thenReturn("30 sec"); + + final RemoteProcessGroupDTO inputRPGDTO = defaultInput(); + inputRPGDTO.setCommunicationsTimeout("31 sec"); + + final Collection actions = updateProcessGroupConfiguration(inputRPGDTO, existingRPG); + + assertEquals(1, actions.size()); + final Action action = actions.iterator().next(); + assertEquals(Operation.Configure, action.getOperation()); + assertConfigureDetails(action.getActionDetails(), "Communications Timeout", + existingRPG.getCommunicationsTimeout(), inputRPGDTO.getCommunicationsTimeout()); + + } + + private void assertConfigureDetails(final ActionDetails actionDetails, final String name, + final Object previousValue, final Object value) { + final ConfigureDetails configureDetails = (ConfigureDetails) actionDetails; + assertEquals(name, configureDetails.getName()); + assertEquals(previousValue != null ? previousValue.toString() : "", configureDetails.getPreviousValue()); + assertEquals(value != null ? value.toString() : "", configureDetails.getValue()); + } + + @Test + public void testConfigureYieldDuration() throws Throwable { + + final RemoteProcessGroup existingRPG = defaultRemoteProcessGroup(); + when(existingRPG.getYieldDuration()).thenReturn("10 sec"); + + final RemoteProcessGroupDTO inputRPGDTO = defaultInput(); + inputRPGDTO.setYieldDuration("11 sec"); + + final Collection actions = updateProcessGroupConfiguration(inputRPGDTO, existingRPG); + + assertEquals(1, actions.size()); + final Action action = actions.iterator().next(); + assertEquals(Operation.Configure, action.getOperation()); + assertConfigureDetails(action.getActionDetails(), "Yield Duration", + existingRPG.getYieldDuration(), inputRPGDTO.getYieldDuration()); + + } + + @Test + public void testConfigureTransportProtocol() throws Throwable { + + final RemoteProcessGroup existingRPG = defaultRemoteProcessGroup(); + when(existingRPG.getTransportProtocol()).thenReturn(SiteToSiteTransportProtocol.RAW); + + final RemoteProcessGroupDTO inputRPGDTO = defaultInput(); + inputRPGDTO.setTransportProtocol("HTTP"); + + final Collection actions = updateProcessGroupConfiguration(inputRPGDTO, existingRPG); + + assertEquals(1, actions.size()); + final Action action = actions.iterator().next(); + assertEquals(Operation.Configure, action.getOperation()); + assertConfigureDetails(action.getActionDetails(), "Transport Protocol", + existingRPG.getTransportProtocol().name(), inputRPGDTO.getTransportProtocol()); + + } + + @Test + public void testConfigureProxyHost() throws Throwable { + + final RemoteProcessGroup existingRPG = defaultRemoteProcessGroup(); + + final RemoteProcessGroupDTO inputRPGDTO = defaultInput(); + inputRPGDTO.setProxyHost("proxy.example.com"); + + final Collection actions = updateProcessGroupConfiguration(inputRPGDTO, existingRPG); + + assertEquals(1, actions.size()); + final Action action = actions.iterator().next(); + assertEquals(Operation.Configure, action.getOperation()); + assertConfigureDetails(action.getActionDetails(), "Proxy Host", + existingRPG.getProxyHost(), inputRPGDTO.getProxyHost()); + + } + + @Test + public void testConfigureProxyHostUpdate() throws Throwable { + + final RemoteProcessGroup existingRPG = defaultRemoteProcessGroup(); + when(existingRPG.getProxyHost()).thenReturn("proxy1.example.com"); + + final RemoteProcessGroupDTO inputRPGDTO = defaultInput(); + inputRPGDTO.setProxyHost("proxy2.example.com"); + + final Collection actions = updateProcessGroupConfiguration(inputRPGDTO, existingRPG); + + assertEquals(1, actions.size()); + final Action action = actions.iterator().next(); + assertEquals(Operation.Configure, action.getOperation()); + assertConfigureDetails(action.getActionDetails(), "Proxy Host", + existingRPG.getProxyHost(), inputRPGDTO.getProxyHost()); + + } + + @Test + public void testConfigureProxyHostClear() throws Throwable { + + final RemoteProcessGroup existingRPG = defaultRemoteProcessGroup(); + when(existingRPG.getProxyHost()).thenReturn("proxy.example.com"); + + final RemoteProcessGroupDTO inputRPGDTO = defaultInput(); + inputRPGDTO.setProxyHost(""); + + final Collection actions = updateProcessGroupConfiguration(inputRPGDTO, existingRPG); + + assertEquals(1, actions.size()); + final Action action = actions.iterator().next(); + assertEquals(Operation.Configure, action.getOperation()); + assertConfigureDetails(action.getActionDetails(), "Proxy Host", + existingRPG.getProxyHost(), inputRPGDTO.getProxyHost()); + + } + + @Test + public void testConfigureProxyPort() throws Throwable { + + final RemoteProcessGroup existingRPG = defaultRemoteProcessGroup(); + + final RemoteProcessGroupDTO inputRPGDTO = defaultInput(); + inputRPGDTO.setProxyPort(3128); + + final Collection actions = updateProcessGroupConfiguration(inputRPGDTO, existingRPG); + + assertEquals(1, actions.size()); + final Action action = actions.iterator().next(); + assertEquals(Operation.Configure, action.getOperation()); + assertConfigureDetails(action.getActionDetails(), "Proxy Port", + existingRPG.getProxyPort(), inputRPGDTO.getProxyPort()); + + } + + + @Test + public void testConfigureProxyPortClear() throws Throwable { + + final RemoteProcessGroup existingRPG = defaultRemoteProcessGroup(); + when(existingRPG.getProxyPort()).thenReturn(3128); + + final RemoteProcessGroupDTO inputRPGDTO = defaultInput(); + inputRPGDTO.setProxyPort(null); + + final Collection actions = updateProcessGroupConfiguration(inputRPGDTO, existingRPG); + + assertEquals(1, actions.size()); + final Action action = actions.iterator().next(); + assertEquals(Operation.Configure, action.getOperation()); + assertConfigureDetails(action.getActionDetails(), "Proxy Port", + existingRPG.getProxyPort(), inputRPGDTO.getProxyPort()); + + } + + @Test + public void testConfigureProxyUser() throws Throwable { + + final RemoteProcessGroup existingRPG = defaultRemoteProcessGroup(); + + final RemoteProcessGroupDTO inputRPGDTO = defaultInput(); + inputRPGDTO.setProxyUser("proxy-user"); + + final Collection actions = updateProcessGroupConfiguration(inputRPGDTO, existingRPG); + + assertEquals(1, actions.size()); + final Action action = actions.iterator().next(); + assertEquals(Operation.Configure, action.getOperation()); + assertConfigureDetails(action.getActionDetails(), "Proxy User", + existingRPG.getProxyUser(), inputRPGDTO.getProxyUser()); + + } + + @Test + public void testConfigureProxyUserClear() throws Throwable { + + final RemoteProcessGroup existingRPG = defaultRemoteProcessGroup(); + when(existingRPG.getProxyUser()).thenReturn("proxy-user"); + + final RemoteProcessGroupDTO inputRPGDTO = defaultInput(); + inputRPGDTO.setProxyUser(null); + + final Collection actions = updateProcessGroupConfiguration(inputRPGDTO, existingRPG); + + assertEquals(1, actions.size()); + final Action action = actions.iterator().next(); + assertEquals(Operation.Configure, action.getOperation()); + assertConfigureDetails(action.getActionDetails(), "Proxy User", + existingRPG.getProxyUser(), inputRPGDTO.getProxyUser()); + + } + + @Test + public void testConfigureProxyPassword() throws Throwable { + + final RemoteProcessGroup existingRPG = defaultRemoteProcessGroup(); + + final RemoteProcessGroupDTO inputRPGDTO = defaultInput(); + inputRPGDTO.setProxyPassword("proxy-password"); + + final Collection actions = updateProcessGroupConfiguration(inputRPGDTO, existingRPG); + + assertEquals(1, actions.size()); + final Action action = actions.iterator().next(); + assertEquals(Operation.Configure, action.getOperation()); + assertConfigureDetails(action.getActionDetails(), "Proxy Password","", SENSITIVE_VALUE_MASK); + + } + + @Test + public void testConfigureProxyPasswordClear() throws Throwable { + + final RemoteProcessGroup existingRPG = defaultRemoteProcessGroup(); + when(existingRPG.getProxyPassword()).thenReturn("proxy-password"); + + final RemoteProcessGroupDTO inputRPGDTO = defaultInput(); + inputRPGDTO.setProxyPassword(null); + + final Collection actions = updateProcessGroupConfiguration(inputRPGDTO, existingRPG); + + assertEquals(1, actions.size()); + final Action action = actions.iterator().next(); + assertEquals(Operation.Configure, action.getOperation()); + assertConfigureDetails(action.getActionDetails(), "Proxy Password", SENSITIVE_VALUE_MASK, ""); + + } + + @SuppressWarnings("unchecked") + private Collection updateProcessGroupInputPortConfiguration(RemoteProcessGroupPortDTO inputRPGPortDTO, RemoteGroupPort existingRPGPort) throws Throwable { + final RemoteProcessGroup existingRPG = defaultRemoteProcessGroup(); + final RemoteProcessGroupAuditor auditor = new RemoteProcessGroupAuditor(); + final ProceedingJoinPoint joinPoint = mock(ProceedingJoinPoint.class); + final String remoteProcessGroupId = "remote-process-group-id"; + inputRPGPortDTO.setId(remoteProcessGroupId); + + final String targetUrl = "http://localhost:8080/nifi"; + when(existingRPG.getIdentifier()).thenReturn(remoteProcessGroupId); + when(existingRPG.getTargetUri()).thenReturn(targetUrl); + + final RemoteProcessGroupDAO remoteProcessGroupDAO = mock(RemoteProcessGroupDAO.class); + when(remoteProcessGroupDAO.getRemoteProcessGroup(remoteProcessGroupId)) + .thenReturn(existingRPG); + + when(existingRPG.getInputPort(eq(inputRPGPortDTO.getId()))).thenReturn(existingRPGPort); + + // Setup updatedRPGPort mock based on inputRPGPortDTO. + final RemoteGroupPort updatedRPGPort = mock(RemoteGroupPort.class); + final String portName = existingRPGPort.getName(); + when(updatedRPGPort.getName()).thenReturn(portName); + if (inputRPGPortDTO.isTransmitting() != null) { + when(updatedRPGPort.isRunning()).thenReturn(inputRPGPortDTO.isTransmitting()); + } + when(updatedRPGPort.getMaxConcurrentTasks()).thenReturn(inputRPGPortDTO.getConcurrentlySchedulableTaskCount()); + when(updatedRPGPort.isUseCompression()).thenReturn(inputRPGPortDTO.getUseCompression()); + + when(joinPoint.proceed()).thenReturn(updatedRPGPort); + + // Capture added actions so that those can be asserted later. + final AuditService auditService = mock(AuditService.class); + final AtomicReference> addedActions = new AtomicReference<>(); + doAnswer(invocation -> { + Collection actions = invocation.getArgumentAt(0, Collection.class); + addedActions.set(actions); + return null; + }).when(auditService).addActions(any()); + + auditor.setAuditService(auditService); + + auditor.auditUpdateProcessGroupInputPortConfiguration(joinPoint, remoteProcessGroupId, inputRPGPortDTO, remoteProcessGroupDAO); + + + final Collection actions = addedActions.get(); + + // Assert common action values. + if (actions != null) { + actions.forEach(action -> { + assertEquals(remoteProcessGroupId, action.getSourceId()); + assertEquals("user-id", action.getUserIdentity()); + assertEquals(targetUrl, ((RemoteProcessGroupDetails)action.getComponentDetails()).getUri()); + assertNotNull(action.getTimestamp()); + }); + } + + return actions; + } + + private RemoteGroupPort defaultRemoteGroupPort() { + final RemoteGroupPort existingRPGPort = mock(RemoteGroupPort.class); + when(existingRPGPort.isRunning()).thenReturn(false); + when(existingRPGPort.getMaxConcurrentTasks()).thenReturn(1); + when(existingRPGPort.isUseCompression()).thenReturn(false); + return existingRPGPort; + } + + private RemoteProcessGroupPortDTO defaultRemoteProcessGroupPortDTO() { + final RemoteProcessGroupPortDTO inputRPGPortDTO = new RemoteProcessGroupPortDTO(); + inputRPGPortDTO.setConcurrentlySchedulableTaskCount(1); + inputRPGPortDTO.setUseCompression(false); + return inputRPGPortDTO; + } + + @Test + public void testEnablePort() throws Throwable { + + final RemoteGroupPort existingRPGPort = defaultRemoteGroupPort(); + when(existingRPGPort.getName()).thenReturn("input-port-1"); + when(existingRPGPort.isRunning()).thenReturn(false); + + final RemoteProcessGroupPortDTO inputRPGPortDTO = defaultRemoteProcessGroupPortDTO(); + inputRPGPortDTO.setTransmitting(true); + + final Collection actions = updateProcessGroupInputPortConfiguration(inputRPGPortDTO, existingRPGPort); + + assertEquals(1, actions.size()); + final Action action = actions.iterator().next(); + assertEquals(Operation.Configure, action.getOperation()); + assertConfigureDetails(action.getActionDetails(), "input-port-1.Transmission", "disabled", "enabled"); + + } + + @Test + public void testDisablePort() throws Throwable { + + final RemoteGroupPort existingRPGPort = defaultRemoteGroupPort(); + when(existingRPGPort.getName()).thenReturn("input-port-1"); + when(existingRPGPort.isRunning()).thenReturn(true); + + final RemoteProcessGroupPortDTO inputRPGPortDTO = defaultRemoteProcessGroupPortDTO(); + inputRPGPortDTO.setTransmitting(false); + + final Collection actions = updateProcessGroupInputPortConfiguration(inputRPGPortDTO, existingRPGPort); + + assertEquals(1, actions.size()); + final Action action = actions.iterator().next(); + assertEquals(Operation.Configure, action.getOperation()); + assertConfigureDetails(action.getActionDetails(), "input-port-1.Transmission", "enabled", "disabled"); + + } + + @Test + public void testConfigurePortConcurrency() throws Throwable { + + final RemoteGroupPort existingRPGPort = defaultRemoteGroupPort(); + when(existingRPGPort.getName()).thenReturn("input-port-1"); + + final RemoteProcessGroupPortDTO inputRPGPortDTO = defaultRemoteProcessGroupPortDTO(); + inputRPGPortDTO.setConcurrentlySchedulableTaskCount(2); + + final Collection actions = updateProcessGroupInputPortConfiguration(inputRPGPortDTO, existingRPGPort); + + assertEquals(1, actions.size()); + final Action action = actions.iterator().next(); + assertEquals(Operation.Configure, action.getOperation()); + assertConfigureDetails(action.getActionDetails(), "input-port-1.Concurrent Tasks", "1", "2"); + + } + + @Test + public void testConfigurePortCompression() throws Throwable { + + final RemoteGroupPort existingRPGPort = defaultRemoteGroupPort(); + when(existingRPGPort.getName()).thenReturn("input-port-1"); + + final RemoteProcessGroupPortDTO inputRPGPortDTO = defaultRemoteProcessGroupPortDTO(); + inputRPGPortDTO.setUseCompression(true); + + final Collection actions = updateProcessGroupInputPortConfiguration(inputRPGPortDTO, existingRPGPort); + + assertEquals(1, actions.size()); + final Action action = actions.iterator().next(); + assertEquals(Operation.Configure, action.getOperation()); + assertConfigureDetails(action.getActionDetails(), "input-port-1.Compressed", "false", "true"); + + } +}