diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/AffectedComponentDTO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/AffectedComponentDTO.java index 5d631ed331..28999a51cf 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/AffectedComponentDTO.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/AffectedComponentDTO.java @@ -17,43 +17,101 @@ package org.apache.nifi.web.api.dto; -import javax.xml.bind.annotation.XmlType; - import com.wordnik.swagger.annotations.ApiModelProperty; +import javax.xml.bind.annotation.XmlType; +import java.util.Collection; + @XmlType(name = "affectedComponent") public class AffectedComponentDTO { public static final String COMPONENT_TYPE_PROCESSOR = "PROCESSOR"; public static final String COMPONENT_TYPE_CONTROLLER_SERVICE = "CONTROLLER_SERVICE"; - private String parentGroupId; - private String componentId; - private String componentType; + private String processGroupId; + private String id; + private String referenceType; + private String name; + private String state; + private Integer activeThreadCount; + + private Collection validationErrors; @ApiModelProperty("The UUID of the Process Group that this component is in") - public String getParentGroupId() { - return parentGroupId; + public String getProcessGroupId() { + return processGroupId; } - public void setParentGroupId(final String parentGroupId) { - this.parentGroupId = parentGroupId; + public void setProcessGroupId(final String processGroupId) { + this.processGroupId = processGroupId; } @ApiModelProperty("The UUID of this component") - public String getComponentId() { - return componentId; + public String getId() { + return id; } - public void setComponentId(final String componentId) { - this.componentId = componentId; + public void setId(final String id) { + this.id = id; } @ApiModelProperty(value = "The type of this component", allowableValues = COMPONENT_TYPE_PROCESSOR + "," + COMPONENT_TYPE_CONTROLLER_SERVICE) - public String getComponentType() { - return componentType; + public String getReferenceType() { + return referenceType; } - public void setComponentType(final String componentType) { - this.componentType = componentType; + public void setReferenceType(final String referenceType) { + this.referenceType = referenceType; + } + + @ApiModelProperty("The name of this component.") + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + /** + * @return scheduled state of the processor referencing a controller service. If this component is another service, this field represents the controller service state + */ + @ApiModelProperty( + value = "The scheduled state of a processor or reporting task referencing a controller service. If this component is another controller " + + "service, this field represents the controller service state." + ) + public String getState() { + return state; + } + + public void setState(String state) { + this.state = state; + } + + /** + * @return active thread count for the referencing component + */ + @ApiModelProperty( + value = "The number of active threads for the referencing component." + ) + public Integer getActiveThreadCount() { + return activeThreadCount; + } + + public void setActiveThreadCount(Integer activeThreadCount) { + this.activeThreadCount = activeThreadCount; + } + + /** + * @return Any validation error associated with this component + */ + @ApiModelProperty( + value = "The validation errors for the component." + ) + public Collection getValidationErrors() { + return validationErrors; + } + + public void setValidationErrors(Collection validationErrors) { + this.validationErrors = validationErrors; } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/ControllerServiceReferencingComponentDTO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/ControllerServiceReferencingComponentDTO.java index 380e6ce3e5..6279ac33f8 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/ControllerServiceReferencingComponentDTO.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/ControllerServiceReferencingComponentDTO.java @@ -19,10 +19,10 @@ package org.apache.nifi.web.api.dto; import com.wordnik.swagger.annotations.ApiModelProperty; import org.apache.nifi.web.api.entity.ControllerServiceReferencingComponentEntity; +import javax.xml.bind.annotation.XmlType; import java.util.Collection; import java.util.Map; import java.util.Set; -import javax.xml.bind.annotation.XmlType; /** * A component referencing a controller service. This can either be another controller service or a processor. Depending on the type of component different properties may be set. @@ -105,11 +105,11 @@ public class ControllerServiceReferencingComponentDTO { } /** - * @return state of the processor referencing a controller service. If this component is another service, this field is blank + * @return scheduled state of the processor referencing a controller service. If this component is another service, this field represents the controller service state */ @ApiModelProperty( - value = "The state of a processor or reporting task referencing a controller service. If this component is another controller " - + "service, this field is blank." + value = "The scheduled state of a processor or reporting task referencing a controller service. If this component is another controller " + + "service, this field represents the controller service state." ) public String getState() { return state; diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/VariableDTO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/VariableDTO.java index c686316451..89ea27a0b8 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/VariableDTO.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/VariableDTO.java @@ -17,19 +17,18 @@ package org.apache.nifi.web.api.dto; -import java.util.HashSet; -import java.util.Set; +import com.wordnik.swagger.annotations.ApiModelProperty; +import org.apache.nifi.web.api.entity.AffectedComponentEntity; import javax.xml.bind.annotation.XmlType; - -import com.wordnik.swagger.annotations.ApiModelProperty; +import java.util.Set; @XmlType(name = "variable") public class VariableDTO { private String name; private String value; private String processGroupId; - private Set affectedComponents = new HashSet<>(); + private Set affectedComponents; @ApiModelProperty("The name of the variable") public String getName() { @@ -59,11 +58,11 @@ public class VariableDTO { } @ApiModelProperty(value = "A set of all components that will be affected if the value of this variable is changed", readOnly = true) - public Set getAffectedComponents() { + public Set getAffectedComponents() { return affectedComponents; } - public void setAffectedComponents(Set affectedComponents) { + public void setAffectedComponents(Set affectedComponents) { this.affectedComponents = affectedComponents; } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/VariableRegistryDTO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/VariableRegistryDTO.java index c106a9a281..8f37532ec9 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/VariableRegistryDTO.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/VariableRegistryDTO.java @@ -17,18 +17,16 @@ package org.apache.nifi.web.api.dto; -import java.util.Set; - -import javax.xml.bind.annotation.XmlType; - +import com.wordnik.swagger.annotations.ApiModelProperty; import org.apache.nifi.web.api.entity.VariableEntity; -import com.wordnik.swagger.annotations.ApiModelProperty; +import javax.xml.bind.annotation.XmlType; +import java.util.Set; @XmlType(name = "variableRegistry") public class VariableRegistryDTO { private Set variables; - private String groupId; + private String processGroupId; public void setVariables(final Set variables) { this.variables = variables; @@ -39,12 +37,12 @@ public class VariableRegistryDTO { return variables; } - public void setProcessGroupId(final String groupId) { - this.groupId = groupId; + public void setProcessGroupId(final String processGroupId) { + this.processGroupId = processGroupId; } @ApiModelProperty("The UUID of the Process Group that this Variable Registry belongs to") public String getProcessGroupId() { - return groupId; + return processGroupId; } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/VariableRegistryUpdateRequestDTO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/VariableRegistryUpdateRequestDTO.java index 06a0dc2d75..e57e30a1d2 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/VariableRegistryUpdateRequestDTO.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/VariableRegistryUpdateRequestDTO.java @@ -17,27 +17,27 @@ package org.apache.nifi.web.api.dto; -import java.util.Date; -import java.util.List; +import com.wordnik.swagger.annotations.ApiModelProperty; +import org.apache.nifi.web.api.dto.util.TimestampAdapter; +import org.apache.nifi.web.api.entity.AffectedComponentEntity; import javax.xml.bind.annotation.XmlType; import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter; - -import org.apache.nifi.web.api.dto.util.TimestampAdapter; - -import com.wordnik.swagger.annotations.ApiModelProperty; +import java.util.Date; +import java.util.List; +import java.util.Set; @XmlType(name = "variableRegistryUpdateRequest") public class VariableRegistryUpdateRequestDTO { private String requestId; private String processGroupId; private String uri; - private Date submissionTime = new Date(); - private Date lastUpdated = new Date(); + private Date submissionTime; + private Date lastUpdated; private boolean complete = false; private String failureReason; private List updateSteps; - + private Set affectedComponents; @ApiModelProperty("The unique ID of the Process Group that the variable registry belongs to") public String getProcessGroupId() { @@ -112,4 +112,13 @@ public class VariableRegistryUpdateRequestDTO { public void setFailureReason(String reason) { this.failureReason = reason; } + + @ApiModelProperty(value = "A set of all components that will be affected if the value of this variable is changed", readOnly = true) + public Set getAffectedComponents() { + return affectedComponents; + } + + public void setAffectedComponents(Set affectedComponents) { + this.affectedComponents = affectedComponents; + } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/AffectedComponentEntity.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/AffectedComponentEntity.java new file mode 100644 index 0000000000..0f28f73e4a --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/AffectedComponentEntity.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.web.api.entity; + +import org.apache.nifi.web.api.dto.AffectedComponentDTO; + +import javax.xml.bind.annotation.XmlRootElement; + +/** + * A serialized representation of this class can be placed in the entity body of a response to the API. + * This particular entity holds a reference to component that references a variable. + */ +@XmlRootElement(name = "affectComponentEntity") +public class AffectedComponentEntity extends ComponentEntity implements Permissible { + + private AffectedComponentDTO component; + + /** + * @return variable referencing components that is being serialized + */ + public AffectedComponentDTO getComponent() { + return component; + } + + public void setComponent(AffectedComponentDTO component) { + this.component = component; + } + +} diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/VariableRegistryEntity.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/VariableRegistryEntity.java index d8764530bf..1b7da0c17d 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/VariableRegistryEntity.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/VariableRegistryEntity.java @@ -17,16 +17,15 @@ package org.apache.nifi.web.api.entity; -import javax.xml.bind.annotation.XmlRootElement; - +import com.wordnik.swagger.annotations.ApiModelProperty; import org.apache.nifi.web.api.dto.RevisionDTO; import org.apache.nifi.web.api.dto.VariableRegistryDTO; -import com.wordnik.swagger.annotations.ApiModelProperty; +import javax.xml.bind.annotation.XmlRootElement; @XmlRootElement(name = "variableRegistryEntity") public class VariableRegistryEntity extends Entity { - private RevisionDTO groupRevision; + private RevisionDTO processGroupRevision; private VariableRegistryDTO variableRegistry; @@ -41,10 +40,10 @@ public class VariableRegistryEntity extends Entity { @ApiModelProperty("The revision of the Process Group that the Variable Registry belongs to") public RevisionDTO getProcessGroupRevision() { - return groupRevision; + return processGroupRevision; } - public void setProcessGroupRevision(RevisionDTO revision) { - this.groupRevision = revision; + public void setProcessGroupRevision(RevisionDTO processGroupRevision) { + this.processGroupRevision = processGroupRevision; } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/VariableRegistryUpdateRequestEntity.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/VariableRegistryUpdateRequestEntity.java index 77257afec3..bfeb9ce07b 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/VariableRegistryUpdateRequestEntity.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/VariableRegistryUpdateRequestEntity.java @@ -17,16 +17,15 @@ package org.apache.nifi.web.api.entity; -import javax.xml.bind.annotation.XmlRootElement; - +import com.wordnik.swagger.annotations.ApiModelProperty; import org.apache.nifi.web.api.dto.RevisionDTO; import org.apache.nifi.web.api.dto.VariableRegistryUpdateRequestDTO; -import com.wordnik.swagger.annotations.ApiModelProperty; +import javax.xml.bind.annotation.XmlRootElement; @XmlRootElement(name = "variableRegistryUpdateRequestEntity") public class VariableRegistryUpdateRequestEntity extends Entity { - private VariableRegistryUpdateRequestDTO requestDto; + private VariableRegistryUpdateRequestDTO request; private RevisionDTO processGroupRevision; @ApiModelProperty("The revision for the Process Group that owns this variable registry.") @@ -39,11 +38,11 @@ public class VariableRegistryUpdateRequestEntity extends Entity { } @ApiModelProperty("The Variable Registry Update Request") - public VariableRegistryUpdateRequestDTO getRequestDto() { - return requestDto; + public VariableRegistryUpdateRequestDTO getRequest() { + return request; } - public void setRequestDto(VariableRegistryUpdateRequestDTO requestDto) { - this.requestDto = requestDto; + public void setRequest(VariableRegistryUpdateRequestDTO request) { + this.request = request; } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/StandardHttpResponseMapper.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/StandardHttpResponseMapper.java index c102746013..fa23603937 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/StandardHttpResponseMapper.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/StandardHttpResponseMapper.java @@ -16,16 +16,6 @@ */ package org.apache.nifi.cluster.coordination.http; -import java.io.IOException; -import java.net.URI; -import java.util.ArrayList; -import java.util.List; -import java.util.Set; -import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; - -import javax.ws.rs.core.StreamingOutput; - import org.apache.nifi.cluster.coordination.http.endpoints.AccessPolicyEndpointMerger; import org.apache.nifi.cluster.coordination.http.endpoints.BulletinBoardEndpointMerger; import org.apache.nifi.cluster.coordination.http.endpoints.ComponentStateEndpointMerger; @@ -79,6 +69,7 @@ import org.apache.nifi.cluster.coordination.http.endpoints.UserEndpointMerger; import org.apache.nifi.cluster.coordination.http.endpoints.UserGroupEndpointMerger; import org.apache.nifi.cluster.coordination.http.endpoints.UserGroupsEndpointMerger; import org.apache.nifi.cluster.coordination.http.endpoints.UsersEndpointMerger; +import org.apache.nifi.cluster.coordination.http.endpoints.VariableRegistryEndpointMerger; import org.apache.nifi.cluster.coordination.http.replication.RequestReplicator; import org.apache.nifi.cluster.manager.NodeResponse; import org.apache.nifi.stream.io.NullOutputStream; @@ -87,6 +78,15 @@ import org.apache.nifi.util.NiFiProperties; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.ws.rs.core.StreamingOutput; +import java.io.IOException; +import java.net.URI; +import java.util.ArrayList; +import java.util.List; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + public class StandardHttpResponseMapper implements HttpResponseMapper { private Logger logger = LoggerFactory.getLogger(StandardHttpResponseMapper.class); @@ -154,6 +154,7 @@ public class StandardHttpResponseMapper implements HttpResponseMapper { endpointMergers.add(new UserGroupEndpointMerger()); endpointMergers.add(new AccessPolicyEndpointMerger()); endpointMergers.add(new SearchUsersEndpointMerger()); + endpointMergers.add(new VariableRegistryEndpointMerger()); } @Override diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/VariableRegistryEndpointMerger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/VariableRegistryEndpointMerger.java new file mode 100644 index 0000000000..0420911945 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/VariableRegistryEndpointMerger.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.cluster.coordination.http.endpoints; + +import org.apache.nifi.cluster.coordination.http.EndpointResponseMerger; +import org.apache.nifi.cluster.manager.AffectedComponentEntityMerger; +import org.apache.nifi.cluster.manager.NodeResponse; +import org.apache.nifi.cluster.protocol.NodeIdentifier; +import org.apache.nifi.web.api.dto.VariableDTO; +import org.apache.nifi.web.api.dto.VariableRegistryDTO; +import org.apache.nifi.web.api.entity.AffectedComponentEntity; +import org.apache.nifi.web.api.entity.VariableEntity; +import org.apache.nifi.web.api.entity.VariableRegistryEntity; + +import java.net.URI; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.Set; +import java.util.regex.Pattern; + +public class VariableRegistryEndpointMerger extends AbstractSingleEntityEndpoint implements EndpointResponseMerger { + public static final Pattern VARIABLE_REGISTRY_UPDATE_REQUEST_URI_PATTERN = Pattern.compile("/nifi-api/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/variable-registry"); + + private final AffectedComponentEntityMerger affectedComponentEntityMerger = new AffectedComponentEntityMerger(); + + @Override + public boolean canHandle(final URI uri, final String method) { + if (("GET".equalsIgnoreCase(method) || "PUT".equalsIgnoreCase(method)) && (VARIABLE_REGISTRY_UPDATE_REQUEST_URI_PATTERN.matcher(uri.getPath()).matches())) { + return true; + } + + return false; + } + + @Override + protected Class getEntityClass() { + return VariableRegistryEntity.class; + } + + @Override + protected void mergeResponses(final VariableRegistryEntity clientEntity, final Map entityMap, + final Set successfulResponses, final Set problematicResponses) { + + final VariableRegistryDTO clientVariableRegistry = clientEntity.getVariableRegistry(); + final Set clientVariableEntities = clientVariableRegistry.getVariables(); + + if (clientVariableEntities != null) { + for (final Iterator i = clientVariableEntities.iterator(); i.hasNext();) { + final VariableEntity clientVariableEntity = i.next(); + final VariableDTO clientVariable = clientVariableEntity.getVariable(); + + final Map> nodeAffectedComponentEntities = new HashMap<>(); + + boolean retainClientVariable = true; + for (final Map.Entry nodeEntry : entityMap.entrySet()) { + final VariableRegistryEntity nodeVariableRegistry = nodeEntry.getValue(); + final Set nodeVariableEntities = nodeVariableRegistry.getVariableRegistry().getVariables(); + + // if this node has no variables, then the current client variable should be removed + if (nodeVariableEntities == null) { + retainClientVariable = false; + break; + } + + boolean variableFound = false; + for (final VariableEntity nodeVariableEntity : nodeVariableEntities) { + final VariableDTO nodeVariable = nodeVariableEntity.getVariable(); + + // identify the current clientVariable for each node + if (clientVariable.getProcessGroupId().equals(nodeVariable.getProcessGroupId()) && clientVariable.getName().equals(nodeVariable.getName())) { + variableFound = true; + + if (Boolean.FALSE.equals(nodeVariableEntity.getCanWrite())) { + clientVariableEntity.setCanWrite(false); + } + + nodeAffectedComponentEntities.put(nodeEntry.getKey(), nodeVariableEntity.getVariable().getAffectedComponents()); + break; + } + } + + if (!variableFound) { + retainClientVariable = false; + break; + } + } + + if (!retainClientVariable) { + i.remove(); + } else { + final Set clientAffectedComponentEntities = clientVariableEntity.getVariable().getAffectedComponents(); + affectedComponentEntityMerger.mergeAffectedComponents(clientAffectedComponentEntities, nodeAffectedComponentEntities); + } + } + } + } +} diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/AffectedComponentEntityMerger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/AffectedComponentEntityMerger.java new file mode 100644 index 0000000000..60a605dacd --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/AffectedComponentEntityMerger.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cluster.manager; + +import org.apache.nifi.cluster.protocol.NodeIdentifier; +import org.apache.nifi.controller.service.ControllerServiceState; +import org.apache.nifi.web.api.dto.AffectedComponentDTO; +import org.apache.nifi.web.api.dto.PermissionsDTO; +import org.apache.nifi.web.api.entity.AffectedComponentEntity; + +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +public class AffectedComponentEntityMerger { + + public void mergeAffectedComponents(final Set affectedComponents, final Map> affectedComponentMap) { + + final Map activeThreadCounts = new HashMap<>(); + final Map states = new HashMap<>(); + final Map canReads = new HashMap<>(); + + for (final Map.Entry> nodeEntry : affectedComponentMap.entrySet()) { + final Set nodeAffectedComponents = nodeEntry.getValue(); + + // go through all the nodes referencing components + if (nodeAffectedComponents != null) { + for (final AffectedComponentEntity nodeAffectedComponentEntity : nodeAffectedComponents) { + final AffectedComponentDTO nodeAffectedComponent = nodeAffectedComponentEntity.getComponent(); + + if (nodeAffectedComponentEntity.getPermissions().getCanRead()) { + // handle active thread counts + if (nodeAffectedComponent.getActiveThreadCount() != null && nodeAffectedComponent.getActiveThreadCount() > 0) { + final Integer current = activeThreadCounts.get(nodeAffectedComponent.getId()); + if (current == null) { + activeThreadCounts.put(nodeAffectedComponent.getId(), nodeAffectedComponent.getActiveThreadCount()); + } else { + activeThreadCounts.put(nodeAffectedComponent.getId(), nodeAffectedComponent.getActiveThreadCount() + current); + } + } + + // handle controller service state + final String state = states.get(nodeAffectedComponent.getId()); + if (state == null) { + if (ControllerServiceState.DISABLING.name().equals(nodeAffectedComponent.getState())) { + states.put(nodeAffectedComponent.getId(), ControllerServiceState.DISABLING.name()); + } else if (ControllerServiceState.ENABLING.name().equals(nodeAffectedComponent.getState())) { + states.put(nodeAffectedComponent.getId(), ControllerServiceState.ENABLING.name()); + } + } + } + + // handle read permissions + final PermissionsDTO mergedPermissions = canReads.get(nodeAffectedComponentEntity.getId()); + final PermissionsDTO permissions = nodeAffectedComponentEntity.getPermissions(); + if (permissions != null) { + if (mergedPermissions == null) { + canReads.put(nodeAffectedComponentEntity.getId(), permissions); + } else { + PermissionsDtoMerger.mergePermissions(mergedPermissions, permissions); + } + } + } + } + } + + // go through each affected components + if (affectedComponents != null) { + for (final AffectedComponentEntity affectedComponent : affectedComponents) { + final PermissionsDTO permissions = canReads.get(affectedComponent.getId()); + if (permissions != null && permissions.getCanRead() != null && permissions.getCanRead()) { + final Integer activeThreadCount = activeThreadCounts.get(affectedComponent.getId()); + if (activeThreadCount != null) { + affectedComponent.getComponent().setActiveThreadCount(activeThreadCount); + } + + final String state = states.get(affectedComponent.getId()); + if (state != null) { + affectedComponent.getComponent().setState(state); + } + } else { + affectedComponent.setPermissions(permissions); + affectedComponent.setComponent(null); + } + } + } + } +} diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java index 2b7b51d848..1754cf7bdf 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java @@ -16,23 +16,7 @@ */ package org.apache.nifi.groups; -import static java.util.Objects.requireNonNull; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.LinkedHashSet; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.Set; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.atomic.AtomicReference; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantReadWriteLock; -import java.util.stream.Collectors; - +import com.google.common.collect.Sets; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.builder.HashCodeBuilder; import org.apache.commons.lang3.builder.ToStringBuilder; @@ -76,6 +60,7 @@ import org.apache.nifi.logging.LogRepositoryFactory; import org.apache.nifi.nar.ExtensionManager; import org.apache.nifi.nar.NarCloseable; import org.apache.nifi.processor.StandardProcessContext; +import org.apache.nifi.registry.ComponentVariableRegistry; import org.apache.nifi.registry.VariableDescriptor; import org.apache.nifi.registry.variable.MutableVariableRegistry; import org.apache.nifi.remote.RemoteGroupPort; @@ -87,7 +72,22 @@ import org.apache.nifi.web.api.dto.TemplateDTO; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.collect.Sets; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.stream.Collectors; + +import static java.util.Objects.requireNonNull; public final class StandardProcessGroup implements ProcessGroup { @@ -2651,7 +2651,7 @@ public final class StandardProcessGroup implements ProcessGroup { final Set affected = new HashSet<>(); // Determine any Processors that references the variable - for (final ProcessorNode processor : findAllProcessors()) { + for (final ProcessorNode processor : getProcessors()) { for (final VariableImpact impact : getVariableImpact(processor)) { if (impact.isImpacted(variableName)) { affected.add(processor); @@ -2662,7 +2662,7 @@ public final class StandardProcessGroup implements ProcessGroup { // Determine any Controller Service that references the variable. If Service A references a variable, // then that means that any other component that references that service is also affected, so recursively // find any references to that service and add it. - for (final ControllerServiceNode service : findAllControllerServices()) { + for (final ControllerServiceNode service : getControllerServices(false)) { for (final VariableImpact impact : getVariableImpact(service)) { if (impact.isImpacted(variableName)) { affected.add(service); @@ -2673,6 +2673,18 @@ public final class StandardProcessGroup implements ProcessGroup { } } + // For any child Process Group that does not override the variable, also include its references. + // If a child group has a value for the same variable, though, then that means that the child group + // is overriding the variable and its components are actually referencing a different variable. + for (final ProcessGroup childGroup : getProcessGroups()) { + final ComponentVariableRegistry childRegistry = childGroup.getVariableRegistry(); + final VariableDescriptor descriptor = childRegistry.getVariableKey(variableName); + final boolean overridden = childRegistry.getVariableMap().containsKey(descriptor); + if (!overridden) { + affected.addAll(childGroup.getComponentsAffectedByVariable(variableName)); + } + } + return affected; } @@ -2695,7 +2707,11 @@ public final class StandardProcessGroup implements ProcessGroup { } private List getVariableImpact(final ConfiguredComponent component) { - return component.getProperties().values().stream() + return component.getProperties().keySet().stream() + .map(descriptor -> { + final String configuredVal = component.getProperty(descriptor); + return configuredVal == null ? descriptor.getDefaultValue() : configuredVal; + }) .map(propVal -> Query.prepare(propVal).getVariableImpact()) .collect(Collectors.toList()); } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/registry/variable/VariableRegistryUpdateRequest.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/registry/variable/VariableRegistryUpdateRequest.java index 82d46832fc..72f9a7a4e1 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/registry/variable/VariableRegistryUpdateRequest.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/registry/variable/VariableRegistryUpdateRequest.java @@ -17,17 +17,28 @@ package org.apache.nifi.registry.variable; +import org.apache.nifi.authorization.user.NiFiUser; +import org.apache.nifi.web.api.dto.RevisionDTO; +import org.apache.nifi.web.api.entity.AffectedComponentEntity; + import java.util.Date; +import java.util.Map; +import java.util.Set; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Function; +import java.util.stream.Collectors; public class VariableRegistryUpdateRequest { private final String requestId; private final String processGroupId; + private final NiFiUser user; private volatile Date submissionTime = new Date(); private volatile Date lastUpdated = new Date(); private volatile boolean complete = false; private final AtomicReference failureReason = new AtomicReference<>(); + private RevisionDTO processGroupRevision; + private Map affectedComponents; private final VariableRegistryUpdateStep identifyComponentsStep = new VariableRegistryUpdateStep("Identifying components affected"); private final VariableRegistryUpdateStep stopProcessors = new VariableRegistryUpdateStep("Stopping affected Processors"); @@ -36,16 +47,17 @@ public class VariableRegistryUpdateRequest { private final VariableRegistryUpdateStep enableServices = new VariableRegistryUpdateStep("Re-Enabling affected Controller Services"); private final VariableRegistryUpdateStep startProcessors = new VariableRegistryUpdateStep("Restarting affected Processors"); - public VariableRegistryUpdateRequest(final String requestId, final String processGroupId) { + public VariableRegistryUpdateRequest(final String requestId, final String processGroupId, final Set affectedComponents, final NiFiUser user) { this.requestId = requestId; this.processGroupId = processGroupId; + this.affectedComponents = affectedComponents.stream().collect(Collectors.toMap(AffectedComponentEntity::getId, Function.identity())); + this.user = user; } public String getProcessGroupId() { return processGroupId; } - public String getRequestId() { return requestId; } @@ -54,6 +66,10 @@ public class VariableRegistryUpdateRequest { return submissionTime; } + public NiFiUser getUser() { + return user; + } + public Date getLastUpdated() { return lastUpdated; } @@ -102,6 +118,18 @@ public class VariableRegistryUpdateRequest { this.failureReason.set(reason); } + public RevisionDTO getProcessGroupRevision() { + return processGroupRevision; + } + + public void setProcessGroupRevision(RevisionDTO processGroupRevision) { + this.processGroupRevision = processGroupRevision; + } + + public Map getAffectedComponents() { + return affectedComponents; + } + public void cancel() { this.failureReason.compareAndSet(null, "Update was cancelled"); this.complete = true; diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java index 6fed58ead2..faa8c0ecf1 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java @@ -16,13 +16,6 @@ */ package org.apache.nifi.web; -import java.util.Date; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.Set; -import java.util.function.Function; - import org.apache.nifi.authorization.AuthorizeAccess; import org.apache.nifi.authorization.RequestAction; import org.apache.nifi.authorization.user.NiFiUser; @@ -77,6 +70,7 @@ import org.apache.nifi.web.api.dto.status.ControllerStatusDTO; import org.apache.nifi.web.api.entity.AccessPolicyEntity; import org.apache.nifi.web.api.entity.ActionEntity; import org.apache.nifi.web.api.entity.ActivateControllerServicesEntity; +import org.apache.nifi.web.api.entity.AffectedComponentEntity; import org.apache.nifi.web.api.entity.BulletinEntity; import org.apache.nifi.web.api.entity.ConnectionEntity; import org.apache.nifi.web.api.entity.ConnectionStatusEntity; @@ -108,6 +102,13 @@ import org.apache.nifi.web.api.entity.UserEntity; import org.apache.nifi.web.api.entity.UserGroupEntity; import org.apache.nifi.web.api.entity.VariableRegistryEntity; +import java.util.Date; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.function.Function; + /** * Defines the NiFiServiceFacade interface. */ @@ -518,9 +519,10 @@ public interface NiFiServiceFacade { * Gets all the Processor transfer objects for this controller. * * @param groupId group + * @param includeDescendants if processors from descendent groups should be included * @return List of all the Processor transfer object */ - Set getProcessors(String groupId); + Set getProcessors(String groupId, boolean includeDescendants); /** * Verifies the specified processor can be updated. @@ -925,12 +927,21 @@ public interface NiFiServiceFacade { VariableRegistryEntity updateVariableRegistry(NiFiUser user, Revision revision, VariableRegistryDTO variableRegistryDto); /** - * Determines which components will be affected by updating the given Variable Registry + * Determines which components will be affected by updating the given Variable Registry. * * @param variableRegistryDto the variable registry * @return the components that will be affected */ - Set getComponentsAffectedByVariableRegistryUpdate(VariableRegistryDTO variableRegistryDto); + Set getComponentsAffectedByVariableRegistryUpdate(VariableRegistryDTO variableRegistryDto); + + /** + * Determines which components are active and will be affected by updating the given Variable Registry. These active components + * are needed to authorize the request and deactivate prior to changing the variables. + * + * @param variableRegistryDto the variable registry + * @return the components that will be affected + */ + Set getActiveComponentsAffectedByVariableRegistryUpdate(VariableRegistryDTO variableRegistryDto); /** * Gets all process groups in the specified parent group. diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java index 7cd5732f0f..9caabd6050 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java @@ -161,6 +161,7 @@ import org.apache.nifi.web.api.entity.AccessPolicyEntity; import org.apache.nifi.web.api.entity.AccessPolicySummaryEntity; import org.apache.nifi.web.api.entity.ActionEntity; import org.apache.nifi.web.api.entity.ActivateControllerServicesEntity; +import org.apache.nifi.web.api.entity.AffectedComponentEntity; import org.apache.nifi.web.api.entity.BulletinEntity; import org.apache.nifi.web.api.entity.ComponentReferenceEntity; import org.apache.nifi.web.api.entity.ConnectionEntity; @@ -790,7 +791,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { } @Override - public Set getComponentsAffectedByVariableRegistryUpdate(final VariableRegistryDTO variableRegistryDto) { + public Set getActiveComponentsAffectedByVariableRegistryUpdate(final VariableRegistryDTO variableRegistryDto) { final ProcessGroup group = processGroupDAO.getProcessGroup(variableRegistryDto.getProcessGroupId()); if (group == null) { throw new ResourceNotFoundException("Could not find Process Group with ID " + variableRegistryDto.getProcessGroupId()); @@ -827,6 +828,29 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { return affectedComponentDtos; } + @Override + public Set getComponentsAffectedByVariableRegistryUpdate(final VariableRegistryDTO variableRegistryDto) { + final ProcessGroup group = processGroupDAO.getProcessGroup(variableRegistryDto.getProcessGroupId()); + if (group == null) { + throw new ResourceNotFoundException("Could not find Process Group with ID " + variableRegistryDto.getProcessGroupId()); + } + + final Map variableMap = new HashMap<>(); + variableRegistryDto.getVariables().stream() // have to use forEach here instead of using Collectors.toMap because value may be null + .map(VariableEntity::getVariable) + .forEach(var -> variableMap.put(var.getName(), var.getValue())); + + final Set affectedComponentEntities = new HashSet<>(); + + final Set updatedVariableNames = getUpdatedVariables(group, variableMap); + for (final String variableName : updatedVariableNames) { + final Set affectedComponents = group.getComponentsAffectedByVariable(variableName); + affectedComponentEntities.addAll(dtoFactory.createAffectedComponentEntities(affectedComponents, revisionManager)); + } + + return affectedComponentEntities; + } + private Set getUpdatedVariables(final ProcessGroup group, final Map newVariableValues) { final Set updatedVariableNames = new HashSet<>(); @@ -856,7 +880,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { final RevisionUpdate snapshot = updateComponent(user, revision, processGroupNode, () -> processGroupDAO.updateVariableRegistry(variableRegistryDto), - processGroup -> dtoFactory.createVariableRegistryDto(processGroup)); + processGroup -> dtoFactory.createVariableRegistryDto(processGroup, revisionManager)); final PermissionsDTO permissions = dtoFactory.createPermissionsDto(processGroupNode); final RevisionDTO updatedRevision = dtoFactory.createRevisionDTO(snapshot.getLastModification()); @@ -2498,8 +2522,8 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { } @Override - public Set getProcessors(final String groupId) { - final Set processors = processorDAO.getProcessors(groupId); + public Set getProcessors(final String groupId, final boolean includeDescendants) { + final Set processors = processorDAO.getProcessors(groupId, includeDescendants); return processors.stream() .map(processor -> createProcessorEntity(processor)) .collect(Collectors.toSet()); @@ -3231,7 +3255,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { } private VariableRegistryEntity createVariableRegistryEntity(final ProcessGroup processGroup, final boolean includeAncestorGroups) { - final VariableRegistryDTO registryDto = dtoFactory.createVariableRegistryDto(processGroup); + final VariableRegistryDTO registryDto = dtoFactory.createVariableRegistryDto(processGroup, revisionManager); final RevisionDTO revision = dtoFactory.createRevisionDTO(revisionManager.getRevision(processGroup.getIdentifier())); final PermissionsDTO permissions = dtoFactory.createPermissionsDto(processGroup); @@ -3240,7 +3264,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { while (parent != null) { final PermissionsDTO parentPerms = dtoFactory.createPermissionsDto(parent); if (Boolean.TRUE.equals(parentPerms.getCanRead())) { - final VariableRegistryDTO parentRegistryDto = dtoFactory.createVariableRegistryDto(parent); + final VariableRegistryDTO parentRegistryDto = dtoFactory.createVariableRegistryDto(parent, revisionManager); final Set parentVariables = parentRegistryDto.getVariables(); registryDto.getVariables().addAll(parentVariables); } @@ -3260,7 +3284,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { throw new ResourceNotFoundException("Could not find group with ID " + groupId); } - final VariableRegistryDTO registryDto = dtoFactory.populateAffectedComponents(variableRegistryDto, processGroup); + final VariableRegistryDTO registryDto = dtoFactory.populateAffectedComponents(variableRegistryDto, processGroup, revisionManager); final RevisionDTO revision = dtoFactory.createRevisionDTO(revisionManager.getRevision(processGroup.getIdentifier())); final PermissionsDTO permissions = dtoFactory.createPermissionsDto(processGroup); return entityFactory.createVariableRegistryEntity(registryDto, revision, permissions); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java index 0b634a144f..0e574e0fdc 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java @@ -16,6 +16,119 @@ */ package org.apache.nifi.web.api; +import com.sun.jersey.api.core.ResourceContext; +import com.sun.jersey.core.util.MultivaluedMapImpl; +import com.sun.jersey.multipart.FormDataParam; +import com.wordnik.swagger.annotations.Api; +import com.wordnik.swagger.annotations.ApiOperation; +import com.wordnik.swagger.annotations.ApiParam; +import com.wordnik.swagger.annotations.ApiResponse; +import com.wordnik.swagger.annotations.ApiResponses; +import com.wordnik.swagger.annotations.Authorization; +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.authorization.AuthorizableLookup; +import org.apache.nifi.authorization.AuthorizeAccess; +import org.apache.nifi.authorization.AuthorizeControllerServiceReference; +import org.apache.nifi.authorization.Authorizer; +import org.apache.nifi.authorization.ComponentAuthorizable; +import org.apache.nifi.authorization.ProcessGroupAuthorizable; +import org.apache.nifi.authorization.RequestAction; +import org.apache.nifi.authorization.SnippetAuthorizable; +import org.apache.nifi.authorization.TemplateContentsAuthorizable; +import org.apache.nifi.authorization.resource.Authorizable; +import org.apache.nifi.authorization.user.NiFiUser; +import org.apache.nifi.authorization.user.NiFiUserDetails; +import org.apache.nifi.authorization.user.NiFiUserUtils; +import org.apache.nifi.bundle.BundleCoordinate; +import org.apache.nifi.cluster.manager.NodeResponse; +import org.apache.nifi.connectable.ConnectableType; +import org.apache.nifi.controller.ScheduledState; +import org.apache.nifi.controller.serialization.FlowEncodingVersion; +import org.apache.nifi.controller.service.ControllerServiceState; +import org.apache.nifi.registry.variable.VariableRegistryUpdateRequest; +import org.apache.nifi.registry.variable.VariableRegistryUpdateStep; +import org.apache.nifi.remote.util.SiteToSiteRestApiClient; +import org.apache.nifi.util.BundleUtils; +import org.apache.nifi.web.NiFiServiceFacade; +import org.apache.nifi.web.ResourceNotFoundException; +import org.apache.nifi.web.Revision; +import org.apache.nifi.web.api.dto.AffectedComponentDTO; +import org.apache.nifi.web.api.dto.BundleDTO; +import org.apache.nifi.web.api.dto.ConnectionDTO; +import org.apache.nifi.web.api.dto.ControllerServiceDTO; +import org.apache.nifi.web.api.dto.DtoFactory; +import org.apache.nifi.web.api.dto.FlowSnippetDTO; +import org.apache.nifi.web.api.dto.PositionDTO; +import org.apache.nifi.web.api.dto.ProcessGroupDTO; +import org.apache.nifi.web.api.dto.ProcessorConfigDTO; +import org.apache.nifi.web.api.dto.ProcessorDTO; +import org.apache.nifi.web.api.dto.RemoteProcessGroupDTO; +import org.apache.nifi.web.api.dto.RevisionDTO; +import org.apache.nifi.web.api.dto.TemplateDTO; +import org.apache.nifi.web.api.dto.VariableRegistryDTO; +import org.apache.nifi.web.api.dto.flow.FlowDTO; +import org.apache.nifi.web.api.dto.status.ProcessorStatusDTO; +import org.apache.nifi.web.api.entity.ActivateControllerServicesEntity; +import org.apache.nifi.web.api.entity.AffectedComponentEntity; +import org.apache.nifi.web.api.entity.ConnectionEntity; +import org.apache.nifi.web.api.entity.ConnectionsEntity; +import org.apache.nifi.web.api.entity.ControllerServiceEntity; +import org.apache.nifi.web.api.entity.ControllerServicesEntity; +import org.apache.nifi.web.api.entity.CopySnippetRequestEntity; +import org.apache.nifi.web.api.entity.CreateTemplateRequestEntity; +import org.apache.nifi.web.api.entity.FlowEntity; +import org.apache.nifi.web.api.entity.FunnelEntity; +import org.apache.nifi.web.api.entity.FunnelsEntity; +import org.apache.nifi.web.api.entity.InputPortsEntity; +import org.apache.nifi.web.api.entity.InstantiateTemplateRequestEntity; +import org.apache.nifi.web.api.entity.LabelEntity; +import org.apache.nifi.web.api.entity.LabelsEntity; +import org.apache.nifi.web.api.entity.OutputPortsEntity; +import org.apache.nifi.web.api.entity.PortEntity; +import org.apache.nifi.web.api.entity.ProcessGroupEntity; +import org.apache.nifi.web.api.entity.ProcessGroupsEntity; +import org.apache.nifi.web.api.entity.ProcessorEntity; +import org.apache.nifi.web.api.entity.ProcessorsEntity; +import org.apache.nifi.web.api.entity.RemoteProcessGroupEntity; +import org.apache.nifi.web.api.entity.RemoteProcessGroupsEntity; +import org.apache.nifi.web.api.entity.ScheduleComponentsEntity; +import org.apache.nifi.web.api.entity.TemplateEntity; +import org.apache.nifi.web.api.entity.VariableRegistryEntity; +import org.apache.nifi.web.api.entity.VariableRegistryUpdateRequestEntity; +import org.apache.nifi.web.api.request.ClientIdParameter; +import org.apache.nifi.web.api.request.LongParameter; +import org.apache.nifi.web.security.token.NiFiAuthenticationToken; +import org.apache.nifi.web.util.Pause; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.security.core.Authentication; +import org.springframework.security.core.context.SecurityContextHolder; + +import javax.servlet.http.HttpServletRequest; +import javax.ws.rs.Consumes; +import javax.ws.rs.DELETE; +import javax.ws.rs.DefaultValue; +import javax.ws.rs.GET; +import javax.ws.rs.HttpMethod; +import javax.ws.rs.POST; +import javax.ws.rs.PUT; +import javax.ws.rs.Path; +import javax.ws.rs.PathParam; +import javax.ws.rs.Produces; +import javax.ws.rs.QueryParam; +import javax.ws.rs.core.Context; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.MultivaluedMap; +import javax.ws.rs.core.Response; +import javax.ws.rs.core.Response.Status; +import javax.ws.rs.core.UriBuilder; +import javax.ws.rs.core.UriInfo; +import javax.xml.bind.JAXBContext; +import javax.xml.bind.JAXBElement; +import javax.xml.bind.JAXBException; +import javax.xml.bind.Unmarshaller; +import javax.xml.transform.stream.StreamSource; +import java.io.IOException; import java.io.InputStream; import java.net.URI; import java.net.URISyntaxException; @@ -40,122 +153,6 @@ import java.util.function.Consumer; import java.util.function.Function; import java.util.stream.Collectors; -import javax.servlet.http.HttpServletRequest; -import javax.ws.rs.Consumes; -import javax.ws.rs.DELETE; -import javax.ws.rs.DefaultValue; -import javax.ws.rs.GET; -import javax.ws.rs.HttpMethod; -import javax.ws.rs.POST; -import javax.ws.rs.PUT; -import javax.ws.rs.Path; -import javax.ws.rs.PathParam; -import javax.ws.rs.Produces; -import javax.ws.rs.QueryParam; -import javax.ws.rs.core.Context; -import javax.ws.rs.core.MediaType; -import javax.ws.rs.core.Response; -import javax.ws.rs.core.Response.Status; -import javax.ws.rs.core.UriBuilder; -import javax.ws.rs.core.UriInfo; -import javax.xml.bind.JAXBContext; -import javax.xml.bind.JAXBElement; -import javax.xml.bind.JAXBException; -import javax.xml.bind.Unmarshaller; -import javax.xml.transform.stream.StreamSource; - -import org.apache.commons.lang3.StringUtils; -import org.apache.nifi.authorization.AuthorizableLookup; -import org.apache.nifi.authorization.AuthorizeControllerServiceReference; -import org.apache.nifi.authorization.Authorizer; -import org.apache.nifi.authorization.ComponentAuthorizable; -import org.apache.nifi.authorization.ProcessGroupAuthorizable; -import org.apache.nifi.authorization.RequestAction; -import org.apache.nifi.authorization.SnippetAuthorizable; -import org.apache.nifi.authorization.TemplateContentsAuthorizable; -import org.apache.nifi.authorization.resource.Authorizable; -import org.apache.nifi.authorization.user.NiFiUser; -import org.apache.nifi.authorization.user.NiFiUserUtils; -import org.apache.nifi.bundle.BundleCoordinate; -import org.apache.nifi.connectable.ConnectableType; -import org.apache.nifi.controller.ScheduledState; -import org.apache.nifi.controller.serialization.FlowEncodingVersion; -import org.apache.nifi.controller.service.ControllerServiceState; -import org.apache.nifi.framework.security.util.SslContextFactory; -import org.apache.nifi.registry.variable.VariableRegistryUpdateRequest; -import org.apache.nifi.registry.variable.VariableRegistryUpdateStep; -import org.apache.nifi.remote.util.SiteToSiteRestApiClient; -import org.apache.nifi.util.BundleUtils; -import org.apache.nifi.util.FormatUtils; -import org.apache.nifi.util.NiFiProperties; -import org.apache.nifi.web.NiFiServiceFacade; -import org.apache.nifi.web.ResourceNotFoundException; -import org.apache.nifi.web.Revision; -import org.apache.nifi.web.api.dto.AffectedComponentDTO; -import org.apache.nifi.web.api.dto.BundleDTO; -import org.apache.nifi.web.api.dto.ConnectionDTO; -import org.apache.nifi.web.api.dto.ControllerServiceDTO; -import org.apache.nifi.web.api.dto.DtoFactory; -import org.apache.nifi.web.api.dto.FlowSnippetDTO; -import org.apache.nifi.web.api.dto.PositionDTO; -import org.apache.nifi.web.api.dto.ProcessGroupDTO; -import org.apache.nifi.web.api.dto.ProcessorConfigDTO; -import org.apache.nifi.web.api.dto.ProcessorDTO; -import org.apache.nifi.web.api.dto.RemoteProcessGroupDTO; -import org.apache.nifi.web.api.dto.RevisionDTO; -import org.apache.nifi.web.api.dto.TemplateDTO; -import org.apache.nifi.web.api.dto.VariableRegistryDTO; -import org.apache.nifi.web.api.dto.flow.FlowDTO; -import org.apache.nifi.web.api.dto.status.ProcessGroupStatusDTO; -import org.apache.nifi.web.api.dto.status.ProcessGroupStatusSnapshotDTO; -import org.apache.nifi.web.api.entity.ActivateControllerServicesEntity; -import org.apache.nifi.web.api.entity.ConnectionEntity; -import org.apache.nifi.web.api.entity.ConnectionsEntity; -import org.apache.nifi.web.api.entity.ControllerServiceEntity; -import org.apache.nifi.web.api.entity.ControllerServicesEntity; -import org.apache.nifi.web.api.entity.CopySnippetRequestEntity; -import org.apache.nifi.web.api.entity.CreateTemplateRequestEntity; -import org.apache.nifi.web.api.entity.FlowEntity; -import org.apache.nifi.web.api.entity.FunnelEntity; -import org.apache.nifi.web.api.entity.FunnelsEntity; -import org.apache.nifi.web.api.entity.InputPortsEntity; -import org.apache.nifi.web.api.entity.InstantiateTemplateRequestEntity; -import org.apache.nifi.web.api.entity.LabelEntity; -import org.apache.nifi.web.api.entity.LabelsEntity; -import org.apache.nifi.web.api.entity.OutputPortsEntity; -import org.apache.nifi.web.api.entity.PortEntity; -import org.apache.nifi.web.api.entity.ProcessGroupEntity; -import org.apache.nifi.web.api.entity.ProcessGroupStatusEntity; -import org.apache.nifi.web.api.entity.ProcessGroupStatusSnapshotEntity; -import org.apache.nifi.web.api.entity.ProcessGroupsEntity; -import org.apache.nifi.web.api.entity.ProcessorEntity; -import org.apache.nifi.web.api.entity.ProcessorsEntity; -import org.apache.nifi.web.api.entity.RemoteProcessGroupEntity; -import org.apache.nifi.web.api.entity.RemoteProcessGroupsEntity; -import org.apache.nifi.web.api.entity.ScheduleComponentsEntity; -import org.apache.nifi.web.api.entity.TemplateEntity; -import org.apache.nifi.web.api.entity.VariableRegistryEntity; -import org.apache.nifi.web.api.entity.VariableRegistryUpdateRequestEntity; -import org.apache.nifi.web.api.request.ClientIdParameter; -import org.apache.nifi.web.api.request.LongParameter; -import org.apache.nifi.web.util.Pause; -import org.apache.nifi.web.util.WebUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.sun.jersey.api.client.Client; -import com.sun.jersey.api.client.ClientResponse; -import com.sun.jersey.api.client.config.ClientConfig; -import com.sun.jersey.api.client.config.DefaultClientConfig; -import com.sun.jersey.api.core.ResourceContext; -import com.sun.jersey.multipart.FormDataParam; -import com.wordnik.swagger.annotations.Api; -import com.wordnik.swagger.annotations.ApiOperation; -import com.wordnik.swagger.annotations.ApiParam; -import com.wordnik.swagger.annotations.ApiResponse; -import com.wordnik.swagger.annotations.ApiResponses; -import com.wordnik.swagger.annotations.Authorization; - /** * RESTful endpoint for managing a Group. */ @@ -455,14 +452,12 @@ public class ProcessGroupResource extends ApplicationResource { throw new IllegalArgumentException("Group ID and Update ID must both be specified."); } - if (isReplicateRequest()) { - return replicate(HttpMethod.GET); - } + final NiFiUser user = NiFiUserUtils.getNiFiUser(); // authorize access serviceFacade.authorizeAccess(lookup -> { final Authorizable processGroup = lookup.getProcessGroup(groupId).getAuthorizable(); - processGroup.authorize(authorizer, RequestAction.READ, NiFiUserUtils.getNiFiUser()); + processGroup.authorize(authorizer, RequestAction.READ, user); }); final VariableRegistryUpdateRequest request = varRegistryUpdateRequests.get(updateId); @@ -474,9 +469,14 @@ public class ProcessGroupResource extends ApplicationResource { throw new ResourceNotFoundException("Could not find a Variable Registry Update Request with identifier " + updateId + " for Process Group with identifier " + groupId); } + if (!user.equals(request.getUser())) { + throw new IllegalArgumentException("Only the user that submitted the update request can retrieve it."); + } + final VariableRegistryUpdateRequestEntity entity = new VariableRegistryUpdateRequestEntity(); - entity.setRequestDto(dtoFactory.createVariableRegistryUpdateRequestDto(request)); - entity.getRequestDto().setUri(generateResourceUri("process-groups", groupId, "variable-registry", updateId)); + entity.setRequest(dtoFactory.createVariableRegistryUpdateRequestDto(request)); + entity.setProcessGroupRevision(request.getProcessGroupRevision()); + entity.getRequest().setUri(generateResourceUri("process-groups", groupId, "variable-registry", updateId)); return generateOkResponse(entity).build(); } @@ -506,15 +506,13 @@ public class ProcessGroupResource extends ApplicationResource { throw new IllegalArgumentException("Group ID and Update ID must both be specified."); } - if (isReplicateRequest()) { - return replicate(HttpMethod.DELETE); - } + final NiFiUser user = NiFiUserUtils.getNiFiUser(); // authorize access serviceFacade.authorizeAccess(lookup -> { final Authorizable processGroup = lookup.getProcessGroup(groupId).getAuthorizable(); - processGroup.authorize(authorizer, RequestAction.READ, NiFiUserUtils.getNiFiUser()); - processGroup.authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser()); + processGroup.authorize(authorizer, RequestAction.READ, user); + processGroup.authorize(authorizer, RequestAction.WRITE, user); }); final VariableRegistryUpdateRequest request = varRegistryUpdateRequests.remove(updateId); @@ -526,11 +524,16 @@ public class ProcessGroupResource extends ApplicationResource { throw new ResourceNotFoundException("Could not find a Variable Registry Update Request with identifier " + updateId + " for Process Group with identifier " + groupId); } + if (!user.equals(request.getUser())) { + throw new IllegalArgumentException("Only the user that submitted the update request can remove it."); + } + request.cancel(); final VariableRegistryUpdateRequestEntity entity = new VariableRegistryUpdateRequestEntity(); - entity.setRequestDto(dtoFactory.createVariableRegistryUpdateRequestDto(request)); - entity.getRequestDto().setUri(generateResourceUri("process-groups", groupId, "variable-registry", updateId)); + entity.setRequest(dtoFactory.createVariableRegistryUpdateRequestDto(request)); + entity.setProcessGroupRevision(request.getProcessGroupRevision()); + entity.getRequest().setUri(generateResourceUri("process-groups", groupId, "variable-registry", updateId)); return generateOkResponse(entity).build(); } @@ -634,11 +637,11 @@ public class ProcessGroupResource extends ApplicationResource { // 1. Determine Affected Components (this includes any Processors and Controller Services and any components that reference an affected Controller Service). // 1a. Determine ID's of components // 1b. Determine Revision's of associated components - // 2. Stop All Affected Processors - // 3. Disable All Affected Controller Services + // 2. Stop All Active Affected Processors + // 3. Disable All Active Affected Controller Services // 4. Update the Variables - // 5. Re-Enable all Affected Controller Services (services only, not dependent components) - // 6. Re-Enable all Processors that Depended on the Controller Services + // 5. Re-Enable all previously Active Affected Controller Services (services only, not dependent components) + // 6. Re-Enable all previously Active Processors that Depended on the Controller Services // Determine the affected components (and their associated revisions) final VariableRegistryEntity computedEntity = serviceFacade.populateAffectedComponents(requestEntity.getVariableRegistry()); @@ -647,38 +650,77 @@ public class ProcessGroupResource extends ApplicationResource { throw new ResourceNotFoundException(String.format("Unable to locate group with id '%s'.", groupId)); } - final Set affectedComponents = serviceFacade.getComponentsAffectedByVariableRegistryUpdate(requestEntity.getVariableRegistry()); + final Set allAffectedComponents = serviceFacade.getComponentsAffectedByVariableRegistryUpdate(requestEntity.getVariableRegistry()); + final Set activeAffectedComponents = serviceFacade.getActiveComponentsAffectedByVariableRegistryUpdate(requestEntity.getVariableRegistry()); - final Map> affectedComponentsByType = affectedComponents.stream() - .collect(Collectors.groupingBy(comp -> comp.getComponentType())); + final Map> activeAffectedComponentsByType = activeAffectedComponents.stream() + .collect(Collectors.groupingBy(comp -> comp.getReferenceType())); - final List affectedProcessors = affectedComponentsByType.get(AffectedComponentDTO.COMPONENT_TYPE_PROCESSOR); - final List affectedServices = affectedComponentsByType.get(AffectedComponentDTO.COMPONENT_TYPE_CONTROLLER_SERVICE); + final List activeAffectedProcessors = activeAffectedComponentsByType.get(AffectedComponentDTO.COMPONENT_TYPE_PROCESSOR); + final List activeAffectedServices = activeAffectedComponentsByType.get(AffectedComponentDTO.COMPONENT_TYPE_CONTROLLER_SERVICE); + final NiFiUser user = NiFiUserUtils.getNiFiUser(); + + // define access authorize for execution below + final AuthorizeAccess authorizeAccess = lookup -> { + final Authorizable groupAuthorizable = lookup.getProcessGroup(groupId).getAuthorizable(); + groupAuthorizable.authorize(authorizer, RequestAction.WRITE, user); + + // For every component that is affected, the user must have READ permissions and WRITE permissions + // (because this action requires stopping the component). + if (activeAffectedProcessors != null) { + for (final AffectedComponentDTO activeAffectedComponent : activeAffectedProcessors) { + final Authorizable authorizable = lookup.getProcessor(activeAffectedComponent.getId()).getAuthorizable(); + authorizable.authorize(authorizer, RequestAction.READ, user); + authorizable.authorize(authorizer, RequestAction.WRITE, user); + } + } + + if (activeAffectedServices != null) { + for (final AffectedComponentDTO activeAffectedComponent : activeAffectedServices) { + final Authorizable authorizable = lookup.getControllerService(activeAffectedComponent.getId()).getAuthorizable(); + authorizable.authorize(authorizer, RequestAction.READ, user); + authorizable.authorize(authorizer, RequestAction.WRITE, user); + } + } + }; if (isReplicateRequest()) { + // authorize access + serviceFacade.authorizeAccess(authorizeAccess); + // update the variable registry - final VariableRegistryUpdateRequest updateRequest = createVariableRegistryUpdateRequest(groupId); + final VariableRegistryUpdateRequest updateRequest = createVariableRegistryUpdateRequest(groupId, allAffectedComponents, user); updateRequest.getIdentifyRelevantComponentsStep().setComplete(true); final URI originalUri = getAbsolutePath(); // Submit the task to be run in the background final Runnable taskWrapper = () -> { try { - updateVariableRegistryReplicated(groupId, originalUri, affectedProcessors, affectedServices, updateRequest, requestEntity); + // set the user authentication token + final Authentication authentication = new NiFiAuthenticationToken(new NiFiUserDetails(user)); + SecurityContextHolder.getContext().setAuthentication(authentication); + + updateVariableRegistryReplicated(groupId, originalUri, activeAffectedProcessors, activeAffectedServices, updateRequest, requestEntity); } catch (final Exception e) { logger.error("Failed to update variable registry", e); + + updateRequest.setComplete(true); updateRequest.setFailureReason("An unexpected error has occurred: " + e); + } finally { + // clear the authentication token + SecurityContextHolder.getContext().setAuthentication(null); } }; variableRegistryThreadPool.submit(taskWrapper); final VariableRegistryUpdateRequestEntity responseEntity = new VariableRegistryUpdateRequestEntity(); - responseEntity.setRequestDto(dtoFactory.createVariableRegistryUpdateRequestDto(updateRequest)); - responseEntity.getRequestDto().setUri(generateResourceUri("process-groups", groupId, "variable-registry", "update-requests", updateRequest.getRequestId())); + responseEntity.setRequest(dtoFactory.createVariableRegistryUpdateRequestDto(updateRequest)); + responseEntity.setProcessGroupRevision(updateRequest.getProcessGroupRevision()); + responseEntity.getRequest().setUri(generateResourceUri("process-groups", groupId, "variable-registry", "update-requests", updateRequest.getRequestId())); - final URI location = URI.create(responseEntity.getRequestDto().getUri()); + final URI location = URI.create(responseEntity.getRequest().getUri()); return Response.status(Status.ACCEPTED).location(location).entity(responseEntity).build(); } @@ -688,34 +730,10 @@ public class ProcessGroupResource extends ApplicationResource { serviceFacade, requestEntity, requestRevision, - lookup -> { - final NiFiUser user = NiFiUserUtils.getNiFiUser(); - - final Authorizable groupAuthorizable = lookup.getProcessGroup(groupId).getAuthorizable(); - groupAuthorizable.authorize(authorizer, RequestAction.WRITE, user); - - // For every component that is affected, the user must have READ permissions and WRITE permissions - // (because this action requires stopping the component). - if (affectedProcessors != null) { - for (final AffectedComponentDTO affectedComponent : affectedProcessors) { - final Authorizable authorizable = lookup.getProcessor(affectedComponent.getComponentId()).getAuthorizable(); - authorizable.authorize(authorizer, RequestAction.READ, user); - authorizable.authorize(authorizer, RequestAction.WRITE, user); - } - } - - if (affectedServices != null) { - for (final AffectedComponentDTO affectedComponent : affectedServices) { - final Authorizable authorizable = lookup.getControllerService(affectedComponent.getComponentId()).getAuthorizable(); - authorizable.authorize(authorizer, RequestAction.READ, user); - authorizable.authorize(authorizer, RequestAction.WRITE, user); - } - } - }, + authorizeAccess, null, - (revision, varRegistryEntity) -> { - return updateVariableRegistryLocal(groupId, affectedProcessors, affectedServices, requestEntity); - }); + (revision, varRegistryEntity) -> updateVariableRegistryLocal(groupId, allAffectedComponents, activeAffectedProcessors, activeAffectedServices, user, requestEntity) + ); } private Pause createPause(final VariableRegistryUpdateRequest updateRequest) { @@ -739,58 +757,49 @@ public class ProcessGroupResource extends ApplicationResource { } private void updateVariableRegistryReplicated(final String groupId, final URI originalUri, final Collection affectedProcessors, - final Collection affectedServices, - final VariableRegistryUpdateRequest updateRequest, final VariableRegistryEntity requestEntity) { - - final NiFiProperties properties = getProperties(); - final Client jerseyClient = WebUtils.createClient(new DefaultClientConfig(), SslContextFactory.createSslContext(properties)); - final int connectionTimeout = (int) FormatUtils.getTimeDuration(properties.getClusterNodeConnectionTimeout(), TimeUnit.MILLISECONDS); - final int readTimeout = (int) FormatUtils.getTimeDuration(properties.getClusterNodeReadTimeout(), TimeUnit.MILLISECONDS); - jerseyClient.getProperties().put(ClientConfig.PROPERTY_CONNECT_TIMEOUT, connectionTimeout); - jerseyClient.getProperties().put(ClientConfig.PROPERTY_READ_TIMEOUT, readTimeout); - jerseyClient.getProperties().put(ClientConfig.PROPERTY_FOLLOW_REDIRECTS, Boolean.TRUE); + final Collection affectedServices, final VariableRegistryUpdateRequest updateRequest, + final VariableRegistryEntity requestEntity) throws InterruptedException, IOException { final Pause pause = createPause(updateRequest); // stop processors if (affectedProcessors != null) { - logger.info("In order to update Variable Registry for Process Group with ID {}, " - + "replicating request to stop {} affected processors", groupId, affectedProcessors.size()); - - scheduleProcessors(groupId, originalUri, jerseyClient, updateRequest, pause, - affectedProcessors, ScheduledState.STOPPED, updateRequest.getStopProcessorsStep()); + logger.info("In order to update Variable Registry for Process Group with ID {}, replicating request to stop {} affected Processors", groupId, affectedProcessors.size()); + scheduleProcessors(groupId, originalUri, updateRequest, pause, affectedProcessors, ScheduledState.STOPPED, updateRequest.getStopProcessorsStep()); + } else { + logger.info("In order to update Variable Registry for Process Group with ID {}, no Processors are affected.", groupId); + updateRequest.getStopProcessorsStep().setComplete(true); } // disable controller services if (affectedServices != null) { - logger.info("In order to update Variable Registry for Process Group with ID {}, " - + "replicating request to stop {} affected Controller Services", groupId, affectedServices.size()); - - activateControllerServices(groupId, originalUri, jerseyClient, updateRequest, pause, - affectedServices, ControllerServiceState.DISABLED, updateRequest.getDisableServicesStep()); + logger.info("In order to update Variable Registry for Process Group with ID {}, replicating request to stop {} affected Controller Services", groupId, affectedServices.size()); + activateControllerServices(groupId, originalUri, updateRequest, pause, affectedServices, ControllerServiceState.DISABLED, updateRequest.getDisableServicesStep()); + } else { + logger.info("In order to update Variable Registry for Process Group with ID {}, no Controller Services are affected.", groupId); + updateRequest.getDisableServicesStep().setComplete(true); } // apply updates - logger.info("In order to update Variable Registry for Process Group with ID {}, " - + "replicating request to apply updates to variable registry", groupId); - applyVariableRegistryUpdate(groupId, originalUri, jerseyClient, updateRequest, requestEntity); + logger.info("In order to update Variable Registry for Process Group with ID {}, replicating request to apply updates to variable registry", groupId); + applyVariableRegistryUpdate(groupId, originalUri, updateRequest, requestEntity); // re-enable controller services if (affectedServices != null) { - logger.info("In order to update Variable Registry for Process Group with ID {}, " - + "replicating request to re-enable {} affected services", groupId, affectedServices.size()); - - activateControllerServices(groupId, originalUri, jerseyClient, updateRequest, pause, - affectedServices, ControllerServiceState.ENABLED, updateRequest.getEnableServicesStep()); + logger.info("In order to update Variable Registry for Process Group with ID {}, replicating request to re-enable {} affected services", groupId, affectedServices.size()); + activateControllerServices(groupId, originalUri, updateRequest, pause, affectedServices, ControllerServiceState.ENABLED, updateRequest.getEnableServicesStep()); + } else { + logger.info("In order to update Variable Registry for Process Group with ID {}, no Controller Services are affected.", groupId); + updateRequest.getEnableServicesStep().setComplete(true); } // restart processors if (affectedProcessors != null) { - logger.info("In order to update Variable Registry for Process Group with ID {}, " - + "replicating request to restart {} affected processors", groupId, affectedProcessors.size()); - - scheduleProcessors(groupId, originalUri, jerseyClient, updateRequest, pause, - affectedProcessors, ScheduledState.RUNNING, updateRequest.getStartProcessorsStep()); + logger.info("In order to update Variable Registry for Process Group with ID {}, replicating request to restart {} affected processors", groupId, affectedProcessors.size()); + scheduleProcessors(groupId, originalUri, updateRequest, pause, affectedProcessors, ScheduledState.RUNNING, updateRequest.getStartProcessorsStep()); + } else { + logger.info("In order to update Variable Registry for Process Group with ID {}, no Processors are affected.", groupId); + updateRequest.getStartProcessorsStep().setComplete(true); } updateRequest.setComplete(true); @@ -799,34 +808,45 @@ public class ProcessGroupResource extends ApplicationResource { /** * Periodically polls the process group with the given ID, waiting for all processors whose ID's are given to have the given Scheduled State. * - * @param client the Jersey Client to use for making the request * @param groupId the ID of the Process Group to poll * @param processorIds the ID of all Processors whose state should be equal to the given desired state * @param desiredState the desired state for all processors with the ID's given * @param pause the Pause that can be used to wait between polling * @return true if successful, false if unable to wait for processors to reach the desired state */ - private boolean waitForProcessorStatus(final Client client, final URI originalUri, final String groupId, final Set processorIds, final ScheduledState desiredState, final Pause pause) { + private boolean waitForProcessorStatus(final URI originalUri, final String groupId, final Set processorIds, final ScheduledState desiredState, + final VariableRegistryUpdateRequest updateRequest, final Pause pause) throws InterruptedException { URI groupUri; try { groupUri = new URI(originalUri.getScheme(), originalUri.getUserInfo(), originalUri.getHost(), - originalUri.getPort(), "/nifi-api/flow/process-groups/" + groupId + "/status", "recursive=true", originalUri.getFragment()); + originalUri.getPort(), "/nifi-api/process-groups/" + groupId + "/processors", "includeDescendantGroups=true", originalUri.getFragment()); } catch (URISyntaxException e) { throw new RuntimeException(e); } + final Map headers = new HashMap<>(); + final MultivaluedMap requestEntity = new MultivaluedMapImpl(); + boolean continuePolling = true; while (continuePolling) { - final ClientResponse response = client.resource(groupUri).header("Content-Type", "application/json").get(ClientResponse.class); - if (response.getStatus() != Status.OK.getStatusCode()) { + + // Determine whether we should replicate only to the cluster coordinator, or if we should replicate directly to the cluster nodes themselves. + final NodeResponse clusterResponse; + if (getReplicationTarget() == ReplicationTarget.CLUSTER_NODES) { + clusterResponse = getRequestReplicator().replicate(HttpMethod.GET, groupUri, requestEntity, headers).awaitMergedResponse(); + } else { + clusterResponse = getRequestReplicator().forwardToCoordinator( + getClusterCoordinatorNode(), HttpMethod.GET, groupUri, requestEntity, headers).awaitMergedResponse(); + } + + if (clusterResponse.getStatus() != Status.OK.getStatusCode()) { return false; } - final ProcessGroupStatusEntity statusEntity = response.getEntity(ProcessGroupStatusEntity.class); - final ProcessGroupStatusDTO statusDto = statusEntity.getProcessGroupStatus(); - final ProcessGroupStatusSnapshotDTO statusSnapshotDto = statusDto.getAggregateSnapshot(); + final ProcessorsEntity processorsEntity = getResponseEntity(clusterResponse, ProcessorsEntity.class); + final Set processorEntities = processorsEntity.getProcessors(); - if (isProcessorStatusEqual(statusSnapshotDto, processorIds, desiredState)) { + if (isProcessorActionComplete(processorEntities, updateRequest, processorIds, desiredState)) { logger.debug("All {} processors of interest now have the desired state of {}", processorIds.size(), desiredState); return true; } @@ -847,14 +867,14 @@ public class ProcessGroupResource extends ApplicationResource { * @param pause the Pause that can be used to wait between polling * @return true if successful, false if unable to wait for processors to reach the desired state */ - private boolean waitForLocalProcessorStatus(final String groupId, final Set processorIds, final ScheduledState desiredState, final Pause pause) { + private boolean waitForLocalProcessor(final String groupId, final Set processorIds, final ScheduledState desiredState, + final VariableRegistryUpdateRequest updateRequest, final Pause pause) { + boolean continuePolling = true; while (continuePolling) { - final ProcessGroupStatusEntity statusEntity = serviceFacade.getProcessGroupStatus(groupId, true); - final ProcessGroupStatusDTO statusDto = statusEntity.getProcessGroupStatus(); - final ProcessGroupStatusSnapshotDTO statusSnapshotDto = statusDto.getAggregateSnapshot(); + final Set processorEntities = serviceFacade.getProcessors(groupId, true); - if (isProcessorStatusEqual(statusSnapshotDto, processorIds, desiredState)) { + if (isProcessorActionComplete(processorEntities, updateRequest, processorIds, desiredState)) { logger.debug("All {} processors of interest now have the desired state of {}", processorIds.size(), desiredState); return true; } @@ -866,55 +886,93 @@ public class ProcessGroupResource extends ApplicationResource { return false; } - private boolean isProcessorStatusEqual(final ProcessGroupStatusSnapshotDTO statusSnapshot, final Set processorIds, final ScheduledState desiredState) { + private boolean isProcessorActionComplete(final Set processorEntities, final VariableRegistryUpdateRequest updateRequest, + final Set processorIds, final ScheduledState desiredState) { + final String desiredStateName = desiredState.name(); - final boolean allProcessorsMatch = statusSnapshot.getProcessorStatusSnapshots().stream() - .map(entity -> entity.getProcessorStatusSnapshot()) - .filter(status -> processorIds.contains(status.getId())) - .allMatch(status -> { - final String runStatus = status.getRunStatus(); - final boolean stateMatches = desiredStateName.equalsIgnoreCase(runStatus); - if (!stateMatches) { - return false; - } + // update the affected processors + processorEntities.stream() + .filter(entity -> updateRequest.getAffectedComponents().containsKey(entity.getId())) + .forEach(entity -> { + final AffectedComponentEntity affectedComponentEntity = updateRequest.getAffectedComponents().get(entity.getId()); + affectedComponentEntity.setRevision(entity.getRevision()); - if (desiredState == ScheduledState.STOPPED && status.getActiveThreadCount() != 0) { - return false; - } + // only consider update this component if the user had permissions to it + if (Boolean.TRUE.equals(affectedComponentEntity.getPermissions().getCanRead())) { + final AffectedComponentDTO affectedComponent = affectedComponentEntity.getComponent(); + affectedComponent.setState(entity.getStatus().getAggregateSnapshot().getRunStatus()); + affectedComponent.setActiveThreadCount(entity.getStatus().getAggregateSnapshot().getActiveThreadCount()); - return true; - }); + if (Boolean.TRUE.equals(entity.getPermissions().getCanRead())) { + affectedComponent.setValidationErrors(entity.getComponent().getValidationErrors()); + } + } + }); + + final boolean allProcessorsMatch = processorEntities.stream() + .filter(entity -> processorIds.contains(entity.getId())) + .allMatch(entity -> { + final ProcessorStatusDTO status = entity.getStatus(); + + final String runStatus = status.getAggregateSnapshot().getRunStatus(); + final boolean stateMatches = desiredStateName.equalsIgnoreCase(runStatus); + if (!stateMatches) { + return false; + } + + if (desiredState == ScheduledState.STOPPED && status.getAggregateSnapshot().getActiveThreadCount() != 0) { + return false; + } + + return true; + }); if (!allProcessorsMatch) { return false; } - for (final ProcessGroupStatusSnapshotEntity childGroupEntity : statusSnapshot.getProcessGroupStatusSnapshots()) { - final ProcessGroupStatusSnapshotDTO childGroupStatus = childGroupEntity.getProcessGroupStatusSnapshot(); - final boolean allMatchChildLevel = isProcessorStatusEqual(childGroupStatus, processorIds, desiredState); - if (!allMatchChildLevel) { - return false; - } - } - return true; } + /** + * Updates the affected controller services in the specified updateRequest with the serviceEntities. + * + * @param serviceEntities service entities + * @param updateRequest update request + */ + private void updateAffectedControllerServices(final Set serviceEntities, final VariableRegistryUpdateRequest updateRequest) { + // update the affected components + serviceEntities.stream() + .filter(entity -> updateRequest.getAffectedComponents().containsKey(entity.getId())) + .forEach(entity -> { + final AffectedComponentEntity affectedComponentEntity = updateRequest.getAffectedComponents().get(entity.getId()); + affectedComponentEntity.setRevision(entity.getRevision()); + // only consider update this component if the user had permissions to it + if (Boolean.TRUE.equals(affectedComponentEntity.getPermissions().getCanRead())) { + final AffectedComponentDTO affectedComponent = affectedComponentEntity.getComponent(); + affectedComponent.setState(entity.getComponent().getState()); + + if (Boolean.TRUE.equals(entity.getPermissions().getCanRead())) { + affectedComponent.setValidationErrors(entity.getComponent().getValidationErrors()); + } + } + }); + } /** * Periodically polls the process group with the given ID, waiting for all controller services whose ID's are given to have the given Controller Service State. * - * @param client the Jersey Client to use for making the HTTP Request * @param groupId the ID of the Process Group to poll * @param serviceIds the ID of all Controller Services whose state should be equal to the given desired state * @param desiredState the desired state for all services with the ID's given * @param pause the Pause that can be used to wait between polling * @return true if successful, false if unable to wait for services to reach the desired state */ - private boolean waitForControllerServiceStatus(final Client client, final URI originalUri, final String groupId, final Set serviceIds, final ControllerServiceState desiredState, - final Pause pause) { + private boolean waitForControllerServiceStatus(final URI originalUri, final String groupId, final Set serviceIds, + final ControllerServiceState desiredState, final VariableRegistryUpdateRequest updateRequest, final Pause pause) throws InterruptedException { + URI groupUri; try { groupUri = new URI(originalUri.getScheme(), originalUri.getUserInfo(), originalUri.getHost(), @@ -923,16 +981,31 @@ public class ProcessGroupResource extends ApplicationResource { throw new RuntimeException(e); } + final Map headers = new HashMap<>(); + final MultivaluedMap requestEntity = new MultivaluedMapImpl(); + boolean continuePolling = true; while (continuePolling) { - final ClientResponse response = client.resource(groupUri).header("Content-Type", "application/json").get(ClientResponse.class); - if (response.getStatus() != Status.OK.getStatusCode()) { + + // Determine whether we should replicate only to the cluster coordinator, or if we should replicate directly to the cluster nodes themselves. + final NodeResponse clusterResponse; + if (getReplicationTarget() == ReplicationTarget.CLUSTER_NODES) { + clusterResponse = getRequestReplicator().replicate(HttpMethod.GET, groupUri, requestEntity, headers).awaitMergedResponse(); + } else { + clusterResponse = getRequestReplicator().forwardToCoordinator( + getClusterCoordinatorNode(), HttpMethod.GET, groupUri, requestEntity, headers).awaitMergedResponse(); + } + + if (clusterResponse.getStatus() != Status.OK.getStatusCode()) { return false; } - final ControllerServicesEntity controllerServicesEntity = response.getEntity(ControllerServicesEntity.class); + final ControllerServicesEntity controllerServicesEntity = getResponseEntity(clusterResponse, ControllerServicesEntity.class); final Set serviceEntities = controllerServicesEntity.getControllerServices(); + // update the affected controller services + updateAffectedControllerServices(serviceEntities, updateRequest); + final String desiredStateName = desiredState.name(); final boolean allServicesMatch = serviceEntities.stream() .map(entity -> entity.getComponent()) @@ -963,11 +1036,16 @@ public class ProcessGroupResource extends ApplicationResource { * @param user the user that is retrieving the controller services * @return true if successful, false if unable to wait for services to reach the desired state */ - private boolean waitForLocalControllerServiceStatus(final String groupId, final Set serviceIds, final ControllerServiceState desiredState, final Pause pause, final NiFiUser user) { + private boolean waitForLocalControllerServiceStatus(final String groupId, final Set serviceIds, final ControllerServiceState desiredState, + final VariableRegistryUpdateRequest updateRequest, final Pause pause, final NiFiUser user) { + boolean continuePolling = true; while (continuePolling) { final Set serviceEntities = serviceFacade.getControllerServices(groupId, false, true, user); + // update the affected controller services + updateAffectedControllerServices(serviceEntities, updateRequest); + final String desiredStateName = desiredState.name(); final boolean allServicesMatch = serviceEntities.stream() .map(entity -> entity.getComponent()) @@ -975,6 +1053,7 @@ public class ProcessGroupResource extends ApplicationResource { .map(service -> service.getState()) .allMatch(state -> desiredStateName.equals(state)); + if (allServicesMatch) { logger.debug("All {} controller services of interest now have the desired state of {}", serviceIds.size(), desiredState); return true; @@ -987,8 +1066,8 @@ public class ProcessGroupResource extends ApplicationResource { return false; } - private VariableRegistryUpdateRequest createVariableRegistryUpdateRequest(final String groupId) { - final VariableRegistryUpdateRequest updateRequest = new VariableRegistryUpdateRequest(UUID.randomUUID().toString(), groupId); + private VariableRegistryUpdateRequest createVariableRegistryUpdateRequest(final String groupId, final Set affectedComponents, final NiFiUser user) { + final VariableRegistryUpdateRequest updateRequest = new VariableRegistryUpdateRequest(UUID.randomUUID().toString(), groupId, affectedComponents, user); // before adding to the request map, purge any old requests. Must do this by creating a List of ID's // and then removing those ID's one-at-a-time in order to avoid ConcurrentModificationException. @@ -1011,27 +1090,26 @@ public class ProcessGroupResource extends ApplicationResource { return updateRequest; } - private Response updateVariableRegistryLocal(final String groupId, final List affectedProcessors, final List affectedServices, - final VariableRegistryEntity requestEntity) { + private Response updateVariableRegistryLocal(final String groupId, final Set affectedComponents, final List affectedProcessors, + final List affectedServices, final NiFiUser user, final VariableRegistryEntity requestEntity) { final Set affectedProcessorIds = affectedProcessors == null ? Collections.emptySet() : affectedProcessors.stream() - .map(component -> component.getComponentId()) + .map(component -> component.getId()) .collect(Collectors.toSet()); Map processorRevisionMap = getRevisions(groupId, affectedProcessorIds); final Set affectedServiceIds = affectedServices == null ? Collections.emptySet() : affectedServices.stream() - .map(component -> component.getComponentId()) + .map(component -> component.getId()) .collect(Collectors.toSet()); Map serviceRevisionMap = getRevisions(groupId, affectedServiceIds); // update the variable registry - final VariableRegistryUpdateRequest updateRequest = createVariableRegistryUpdateRequest(groupId); + final VariableRegistryUpdateRequest updateRequest = createVariableRegistryUpdateRequest(groupId, affectedComponents, user); updateRequest.getIdentifyRelevantComponentsStep().setComplete(true); final Pause pause = createPause(updateRequest); final Revision requestRevision = getRevision(requestEntity.getProcessGroupRevision(), groupId); - final NiFiUser user = NiFiUserUtils.getNiFiUser(); final Runnable updateTask = new Runnable() { @Override public void run() { @@ -1052,21 +1130,26 @@ public class ProcessGroupResource extends ApplicationResource { // Apply the updates performUpdateVariableRegistryStep(groupId, updateRequest, updateRequest.getApplyUpdatesStep(), "Applying updates to Variable Registry", - () -> serviceFacade.updateVariableRegistry(user, requestRevision, requestEntity.getVariableRegistry())); + () -> { + final VariableRegistryEntity entity = serviceFacade.updateVariableRegistry(user, requestRevision, requestEntity.getVariableRegistry()); + updateRequest.setProcessGroupRevision(entity.getProcessGroupRevision()); + }); // Re-enable the controller services performUpdateVariableRegistryStep(groupId, updateRequest, updateRequest.getEnableServicesStep(), "Re-enabling Controller Services", - () -> enableControllerServices(user, groupId, updatedServiceRevisionMap, pause)); + () -> enableControllerServices(user, updateRequest, groupId, updatedServiceRevisionMap, pause)); // Restart processors performUpdateVariableRegistryStep(groupId, updateRequest, updateRequest.getStartProcessorsStep(), "Restarting Processors", - () -> startProcessors(user, groupId, updatedProcessorRevisionMap, pause)); + () -> startProcessors(user, updateRequest, groupId, updatedProcessorRevisionMap, pause)); // Set complete updateRequest.setComplete(true); updateRequest.setLastUpdated(new Date()); } catch (final Exception e) { logger.error("Failed to update Variable Registry for Proces Group with ID " + groupId, e); + + updateRequest.setComplete(true); updateRequest.setFailureReason("An unexpected error has occurred: " + e); } } @@ -1076,10 +1159,11 @@ public class ProcessGroupResource extends ApplicationResource { variableRegistryThreadPool.submit(updateTask); final VariableRegistryUpdateRequestEntity responseEntity = new VariableRegistryUpdateRequestEntity(); - responseEntity.setRequestDto(dtoFactory.createVariableRegistryUpdateRequestDto(updateRequest)); - responseEntity.getRequestDto().setUri(generateResourceUri("process-groups", groupId, "variable-registry", "update-requests", updateRequest.getRequestId())); + responseEntity.setRequest(dtoFactory.createVariableRegistryUpdateRequestDto(updateRequest)); + responseEntity.setProcessGroupRevision(updateRequest.getProcessGroupRevision()); + responseEntity.getRequest().setUri(generateResourceUri("process-groups", groupId, "variable-registry", "update-requests", updateRequest.getRequestId())); - final URI location = URI.create(responseEntity.getRequestDto().getUri()); + final URI location = URI.create(responseEntity.getRequest().getUri()); return Response.status(Status.ACCEPTED).location(location).entity(responseEntity).build(); } @@ -1103,11 +1187,12 @@ public class ProcessGroupResource extends ApplicationResource { action.run(); step.setComplete(true); } catch (final Exception e) { - request.setComplete(true); logger.error("Failed to update variable registry for Process Group with ID {}", groupId, e); step.setComplete(true); step.setFailureReason(e.getMessage()); + + request.setComplete(true); request.setFailureReason("Failed to update Variable Registry because failed while performing step: " + stepDescription); } @@ -1123,21 +1208,21 @@ public class ProcessGroupResource extends ApplicationResource { serviceFacade.verifyScheduleComponents(processGroupId, ScheduledState.STOPPED, processorRevisions.keySet()); serviceFacade.scheduleComponents(user, processGroupId, ScheduledState.STOPPED, processorRevisions); - waitForLocalProcessorStatus(processGroupId, processorRevisions.keySet(), ScheduledState.STOPPED, pause); + waitForLocalProcessor(processGroupId, processorRevisions.keySet(), ScheduledState.STOPPED, updateRequest, pause); } - private void startProcessors(final NiFiUser user, final String processGroupId, final Map processorRevisions, final Pause pause) { + private void startProcessors(final NiFiUser user, final VariableRegistryUpdateRequest request, final String processGroupId, final Map processorRevisions, final Pause pause) { if (processorRevisions.isEmpty()) { return; } serviceFacade.verifyScheduleComponents(processGroupId, ScheduledState.RUNNING, processorRevisions.keySet()); serviceFacade.scheduleComponents(user, processGroupId, ScheduledState.RUNNING, processorRevisions); - waitForLocalProcessorStatus(processGroupId, processorRevisions.keySet(), ScheduledState.RUNNING, pause); + waitForLocalProcessor(processGroupId, processorRevisions.keySet(), ScheduledState.RUNNING, request, pause); } private void disableControllerServices(final NiFiUser user, final VariableRegistryUpdateRequest updateRequest, final String processGroupId, - final Map serviceRevisions, final Pause pause) { + final Map serviceRevisions, final Pause pause) { if (serviceRevisions.isEmpty()) { return; @@ -1145,116 +1230,141 @@ public class ProcessGroupResource extends ApplicationResource { serviceFacade.verifyActivateControllerServices(processGroupId, ControllerServiceState.DISABLED, serviceRevisions.keySet()); serviceFacade.activateControllerServices(user, processGroupId, ControllerServiceState.DISABLED, serviceRevisions); - waitForLocalControllerServiceStatus(processGroupId, serviceRevisions.keySet(), ControllerServiceState.DISABLED, pause, user); + waitForLocalControllerServiceStatus(processGroupId, serviceRevisions.keySet(), ControllerServiceState.DISABLED, updateRequest, pause, user); } - private void enableControllerServices(final NiFiUser user, final String processGroupId, final Map serviceRevisions, final Pause pause) { + private void enableControllerServices(final NiFiUser user, final VariableRegistryUpdateRequest updateRequest, final String processGroupId, + final Map serviceRevisions, final Pause pause) { + if (serviceRevisions.isEmpty()) { return; } serviceFacade.verifyActivateControllerServices(processGroupId, ControllerServiceState.ENABLED, serviceRevisions.keySet()); serviceFacade.activateControllerServices(user, processGroupId, ControllerServiceState.ENABLED, serviceRevisions); - waitForLocalControllerServiceStatus(processGroupId, serviceRevisions.keySet(), ControllerServiceState.ENABLED, pause, user); + waitForLocalControllerServiceStatus(processGroupId, serviceRevisions.keySet(), ControllerServiceState.ENABLED, updateRequest, pause, user); } - private void scheduleProcessors(final String groupId, final URI originalUri, final Client jerseyClient, final VariableRegistryUpdateRequest updateRequest, - final Pause pause, final Collection affectedProcessors, final ScheduledState desiredState, final VariableRegistryUpdateStep updateStep) { + private void scheduleProcessors(final String groupId, final URI originalUri, final VariableRegistryUpdateRequest updateRequest, + final Pause pause, final Collection affectedProcessors, final ScheduledState desiredState, + final VariableRegistryUpdateStep updateStep) throws InterruptedException { + final Set affectedProcessorIds = affectedProcessors.stream() - .map(component -> component.getComponentId()) + .map(component -> component.getId()) .collect(Collectors.toSet()); final Map processorRevisionMap = getRevisions(groupId, affectedProcessorIds); final Map processorRevisionDtoMap = processorRevisionMap.entrySet().stream().collect( Collectors.toMap(Map.Entry::getKey, entry -> dtoFactory.createRevisionDTO(entry.getValue()))); - final ScheduleComponentsEntity stopProcessorsEntity = new ScheduleComponentsEntity(); - stopProcessorsEntity.setComponents(processorRevisionDtoMap); - stopProcessorsEntity.setId(groupId); - stopProcessorsEntity.setState(desiredState.name()); + final ScheduleComponentsEntity scheduleProcessorsEntity = new ScheduleComponentsEntity(); + scheduleProcessorsEntity.setComponents(processorRevisionDtoMap); + scheduleProcessorsEntity.setId(groupId); + scheduleProcessorsEntity.setState(desiredState.name()); - URI stopProcessorUri; + URI scheduleGroupUri; try { - stopProcessorUri = new URI(originalUri.getScheme(), originalUri.getUserInfo(), originalUri.getHost(), + scheduleGroupUri = new URI(originalUri.getScheme(), originalUri.getUserInfo(), originalUri.getHost(), originalUri.getPort(), "/nifi-api/flow/process-groups/" + groupId, null, originalUri.getFragment()); } catch (URISyntaxException e) { throw new RuntimeException(e); } - final ClientResponse stopProcessorResponse = jerseyClient.resource(stopProcessorUri) - .header("Content-Type", "application/json") - .entity(stopProcessorsEntity) - .put(ClientResponse.class); + final Map headers = new HashMap<>(); + headers.put("content-type", MediaType.APPLICATION_JSON); - final int stopProcessorStatus = stopProcessorResponse.getStatus(); + // Determine whether we should replicate only to the cluster coordinator, or if we should replicate directly to the cluster nodes themselves. + final NodeResponse clusterResponse; + if (getReplicationTarget() == ReplicationTarget.CLUSTER_NODES) { + clusterResponse = getRequestReplicator().replicate(HttpMethod.PUT, scheduleGroupUri, scheduleProcessorsEntity, headers).awaitMergedResponse(); + } else { + clusterResponse = getRequestReplicator().forwardToCoordinator( + getClusterCoordinatorNode(), HttpMethod.PUT, scheduleGroupUri, scheduleProcessorsEntity, headers).awaitMergedResponse(); + } + + final int stopProcessorStatus = clusterResponse.getStatus(); if (stopProcessorStatus != Status.OK.getStatusCode()) { updateRequest.getStopProcessorsStep().setFailureReason("Failed while " + updateStep.getDescription()); + + updateStep.setComplete(true); updateRequest.setFailureReason("Failed while " + updateStep.getDescription()); return; } updateRequest.setLastUpdated(new Date()); - final boolean processorsTransitioned = waitForProcessorStatus(jerseyClient, originalUri, groupId, affectedProcessorIds, desiredState, pause); - if (processorsTransitioned) { - updateStep.setComplete(true); - } else { + final boolean processorsTransitioned = waitForProcessorStatus(originalUri, groupId, affectedProcessorIds, desiredState, updateRequest, pause); + updateStep.setComplete(true); + + if (!processorsTransitioned) { updateStep.setFailureReason("Failed while " + updateStep.getDescription()); + + updateRequest.setComplete(true); updateRequest.setFailureReason("Failed while " + updateStep.getDescription()); - return; } } - private void activateControllerServices(final String groupId, final URI originalUri, final Client jerseyClient, final VariableRegistryUpdateRequest updateRequest, - final Pause pause, final Collection affectedServices, final ControllerServiceState desiredState, final VariableRegistryUpdateStep updateStep) { + private void activateControllerServices(final String groupId, final URI originalUri, final VariableRegistryUpdateRequest updateRequest, + final Pause pause, final Collection affectedServices, final ControllerServiceState desiredState, final VariableRegistryUpdateStep updateStep) throws InterruptedException { final Set affectedServiceIds = affectedServices.stream() - .map(component -> component.getComponentId()) + .map(component -> component.getId()) .collect(Collectors.toSet()); final Map serviceRevisionMap = getRevisions(groupId, affectedServiceIds); final Map serviceRevisionDtoMap = serviceRevisionMap.entrySet().stream().collect( Collectors.toMap(Map.Entry::getKey, entry -> dtoFactory.createRevisionDTO(entry.getValue()))); - final ActivateControllerServicesEntity disableServicesEntity = new ActivateControllerServicesEntity(); - disableServicesEntity.setComponents(serviceRevisionDtoMap); - disableServicesEntity.setId(groupId); - disableServicesEntity.setState(desiredState.name()); + final ActivateControllerServicesEntity activateServicesEntity = new ActivateControllerServicesEntity(); + activateServicesEntity.setComponents(serviceRevisionDtoMap); + activateServicesEntity.setId(groupId); + activateServicesEntity.setState(desiredState.name()); - URI disableServicesUri; + URI controllerServicesUri; try { - disableServicesUri = new URI(originalUri.getScheme(), originalUri.getUserInfo(), originalUri.getHost(), + controllerServicesUri = new URI(originalUri.getScheme(), originalUri.getUserInfo(), originalUri.getHost(), originalUri.getPort(), "/nifi-api/flow/process-groups/" + groupId + "/controller-services", null, originalUri.getFragment()); } catch (URISyntaxException e) { throw new RuntimeException(e); } - final ClientResponse disableServicesResponse = jerseyClient.resource(disableServicesUri) - .header("Content-Type", "application/json") - .entity(disableServicesEntity) - .put(ClientResponse.class); + final Map headers = new HashMap<>(); + headers.put("content-type", MediaType.APPLICATION_JSON); - final int disableServicesStatus = disableServicesResponse.getStatus(); + // Determine whether we should replicate only to the cluster coordinator, or if we should replicate directly to the cluster nodes themselves. + final NodeResponse clusterResponse; + if (getReplicationTarget() == ReplicationTarget.CLUSTER_NODES) { + clusterResponse = getRequestReplicator().replicate(HttpMethod.PUT, controllerServicesUri, activateServicesEntity, headers).awaitMergedResponse(); + } else { + clusterResponse = getRequestReplicator().forwardToCoordinator( + getClusterCoordinatorNode(), HttpMethod.PUT, controllerServicesUri, activateServicesEntity, headers).awaitMergedResponse(); + } + + final int disableServicesStatus = clusterResponse.getStatus(); if (disableServicesStatus != Status.OK.getStatusCode()) { updateStep.setFailureReason("Failed while " + updateStep.getDescription()); + + updateStep.setComplete(true); updateRequest.setFailureReason("Failed while " + updateStep.getDescription()); return; } updateRequest.setLastUpdated(new Date()); - if (waitForControllerServiceStatus(jerseyClient, originalUri, groupId, affectedServiceIds, desiredState, pause)) { - updateStep.setComplete(true); - } else { + final boolean serviceTransitioned = waitForControllerServiceStatus(originalUri, groupId, affectedServiceIds, desiredState, updateRequest, pause); + updateStep.setComplete(true); + + if (!serviceTransitioned) { updateStep.setFailureReason("Failed while " + updateStep.getDescription()); + + updateRequest.setComplete(true); updateRequest.setFailureReason("Failed while " + updateStep.getDescription()); - return; } } + private void applyVariableRegistryUpdate(final String groupId, final URI originalUri, final VariableRegistryUpdateRequest updateRequest, + final VariableRegistryEntity updateEntity) throws InterruptedException, IOException { - private void applyVariableRegistryUpdate(final String groupId, final URI originalUri, final Client jerseyClient, final VariableRegistryUpdateRequest updateRequest, - final VariableRegistryEntity updateEntity) { - + // convert request accordingly URI applyUpdatesUri; try { applyUpdatesUri = new URI(originalUri.getScheme(), originalUri.getUserInfo(), originalUri.getHost(), @@ -1263,18 +1373,50 @@ public class ProcessGroupResource extends ApplicationResource { throw new RuntimeException(e); } - final ClientResponse applyUpdatesResponse = jerseyClient.resource(applyUpdatesUri) - .header("Content-Type", "application/json") - .entity(updateEntity) - .put(ClientResponse.class); + final Map headers = new HashMap<>(); + headers.put("content-type", MediaType.APPLICATION_JSON); - final int applyUpdatesStatus = applyUpdatesResponse.getStatus(); - updateRequest.setLastUpdated(new Date()); - if (applyUpdatesStatus != Status.OK.getStatusCode()) { - updateRequest.getApplyUpdatesStep().setFailureReason("Failed to apply updates to the Variable Registry"); - updateRequest.setFailureReason("Failed to apply updates to the Variable Registry"); - return; + // Determine whether we should replicate only to the cluster coordinator, or if we should replicate directly to the cluster nodes themselves. + final NodeResponse clusterResponse; + if (getReplicationTarget() == ReplicationTarget.CLUSTER_NODES) { + clusterResponse = getRequestReplicator().replicate(HttpMethod.PUT, applyUpdatesUri, updateEntity, headers).awaitMergedResponse(); + } else { + clusterResponse = getRequestReplicator().forwardToCoordinator( + getClusterCoordinatorNode(), HttpMethod.PUT, applyUpdatesUri, updateEntity, headers).awaitMergedResponse(); } + + final int applyUpdatesStatus = clusterResponse.getStatus(); + updateRequest.setLastUpdated(new Date()); + updateRequest.getApplyUpdatesStep().setComplete(true); + + if (applyUpdatesStatus == Status.OK.getStatusCode()) { + // grab the current process group revision + final VariableRegistryEntity entity = getResponseEntity(clusterResponse, VariableRegistryEntity.class); + updateRequest.setProcessGroupRevision(entity.getProcessGroupRevision()); + } else { + final String message = getResponseEntity(clusterResponse, String.class); + + // update the request progress + updateRequest.getApplyUpdatesStep().setFailureReason("Failed to apply updates to the Variable Registry: " + message); + updateRequest.setComplete(true); + updateRequest.setFailureReason("Failed to apply updates to the Variable Registry: " + message); + } + } + + /** + * Extracts the response entity from the specified node response. + * + * @param nodeResponse node response + * @param clazz class + * @param type of class + * @return the response entity + */ + private T getResponseEntity(final NodeResponse nodeResponse, final Class clazz) { + T entity = (T) nodeResponse.getUpdatedEntity(); + if (entity == null) { + entity = nodeResponse.getClientResponse().getEntity(clazz); + } + return entity; } /** @@ -1676,7 +1818,8 @@ public class ProcessGroupResource extends ApplicationResource { value = "The process group id.", required = true ) - @PathParam("id") final String groupId) { + @PathParam("id") final String groupId, + @ApiParam("Whether or not to include processors from descendant process groups") @QueryParam("includeDescendantGroups") @DefaultValue("false") boolean includeDescendantGroups) { if (isReplicateRequest()) { return replicate(HttpMethod.GET); @@ -1689,7 +1832,7 @@ public class ProcessGroupResource extends ApplicationResource { }); // get the processors - final Set processors = serviceFacade.getProcessors(groupId); + final Set processors = serviceFacade.getProcessors(groupId, includeDescendantGroups); // create the response entity final ProcessorsEntity entity = new ProcessorsEntity(); @@ -3121,7 +3264,6 @@ public class ProcessGroupResource extends ApplicationResource { uriBuilder.segment("process-groups", groupId, "templates", "import"); final URI importUri = uriBuilder.build(); - // change content type to XML for serializing entity final Map headersToOverride = new HashMap<>(); headersToOverride.put("content-type", MediaType.APPLICATION_XML); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java index ed42e9f327..1a4d52b20d 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java @@ -16,33 +16,6 @@ */ package org.apache.nifi.web.api.dto; -import java.text.Collator; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.Comparator; -import java.util.Date; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.LinkedHashMap; -import java.util.LinkedHashSet; -import java.util.List; -import java.util.Locale; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Set; -import java.util.TimeZone; -import java.util.TreeMap; -import java.util.TreeSet; -import java.util.concurrent.TimeUnit; -import java.util.function.Function; -import java.util.function.Supplier; -import java.util.stream.Collectors; - -import javax.ws.rs.WebApplicationException; - import org.apache.commons.lang3.ClassUtils; import org.apache.commons.lang3.StringUtils; import org.apache.nifi.action.Action; @@ -182,6 +155,7 @@ import org.apache.nifi.web.api.dto.status.RemoteProcessGroupStatusDTO; import org.apache.nifi.web.api.dto.status.RemoteProcessGroupStatusSnapshotDTO; import org.apache.nifi.web.api.entity.AccessPolicyEntity; import org.apache.nifi.web.api.entity.AccessPolicySummaryEntity; +import org.apache.nifi.web.api.entity.AffectedComponentEntity; import org.apache.nifi.web.api.entity.AllowableValueEntity; import org.apache.nifi.web.api.entity.BulletinEntity; import org.apache.nifi.web.api.entity.ComponentReferenceEntity; @@ -196,6 +170,32 @@ import org.apache.nifi.web.api.entity.VariableEntity; import org.apache.nifi.web.controller.ControllerFacade; import org.apache.nifi.web.revision.RevisionManager; +import javax.ws.rs.WebApplicationException; +import java.text.Collator; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.Date; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.TimeZone; +import java.util.TreeMap; +import java.util.TreeSet; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; +import java.util.function.Supplier; +import java.util.stream.Collectors; + public final class DtoFactory { @SuppressWarnings("rawtypes") @@ -1737,13 +1737,29 @@ public final class DtoFactory { public AffectedComponentDTO createAffectedComponentDto(final ConfiguredComponent component) { final AffectedComponentDTO dto = new AffectedComponentDTO(); - dto.setComponentId(component.getIdentifier()); - dto.setParentGroupId(component.getProcessGroupIdentifier()); + dto.setId(component.getIdentifier()); + dto.setName(component.getName()); + dto.setProcessGroupId(component.getProcessGroupIdentifier()); if (component instanceof ProcessorNode) { - dto.setComponentType(AffectedComponentDTO.COMPONENT_TYPE_PROCESSOR); + final ProcessorNode node = ((ProcessorNode) component); + dto.setState(node.getScheduledState().name()); + dto.setActiveThreadCount(node.getActiveThreadCount()); + dto.setReferenceType(AffectedComponentDTO.COMPONENT_TYPE_PROCESSOR); } else if (component instanceof ControllerServiceNode) { - dto.setComponentType(AffectedComponentDTO.COMPONENT_TYPE_CONTROLLER_SERVICE); + final ControllerServiceNode node = ((ControllerServiceNode) component); + dto.setState(node.getState().name()); + dto.setReferenceType(AffectedComponentDTO.COMPONENT_TYPE_CONTROLLER_SERVICE); + } + + final Collection validationErrors = component.getValidationErrors(); + if (validationErrors != null && !validationErrors.isEmpty()) { + final List errors = new ArrayList<>(); + for (final ValidationResult validationResult : validationErrors) { + errors.add(validationResult.toString()); + } + + dto.setValidationErrors(errors); } return dto; @@ -2114,8 +2130,18 @@ public final class DtoFactory { return deprecationNotice == null ? null : deprecationNotice.reason(); } + public Set createAffectedComponentEntities(final Set affectedComponents, final RevisionManager revisionManager) { + return affectedComponents.stream() + .map(component -> { + final AffectedComponentDTO affectedComponent = createAffectedComponentDto(component); + final PermissionsDTO permissions = createPermissionsDto(component); + final RevisionDTO revision = createRevisionDTO(revisionManager.getRevision(component.getIdentifier())); + return entityFactory.createAffectedComponentEntity(affectedComponent, revision, permissions); + }) + .collect(Collectors.toSet()); + } - public VariableRegistryDTO createVariableRegistryDto(final ProcessGroup processGroup) { + public VariableRegistryDTO createVariableRegistryDto(final ProcessGroup processGroup, final RevisionManager revisionManager) { final ComponentVariableRegistry variableRegistry = processGroup.getVariableRegistry(); final List variableNames = variableRegistry.getVariableMap().keySet().stream() @@ -2130,21 +2156,18 @@ public final class DtoFactory { variableDto.setValue(variableRegistry.getVariableValue(variableName)); variableDto.setProcessGroupId(processGroup.getIdentifier()); - final Set affectedComponents = processGroup.getComponentsAffectedByVariable(variableName); - final Set affectedComponentDtos = affectedComponents.stream() - .map(component -> createAffectedComponentDto(component)) - .collect(Collectors.toSet()); + final Set affectedComponentEntities = createAffectedComponentEntities(processGroup.getComponentsAffectedByVariable(variableName), revisionManager); boolean canWrite = true; - for (final ConfiguredComponent component : affectedComponents) { - final PermissionsDTO permissions = createPermissionsDto(component); + for (final AffectedComponentEntity affectedComponent : affectedComponentEntities) { + final PermissionsDTO permissions = affectedComponent.getPermissions(); if (!permissions.getCanRead() || !permissions.getCanWrite()) { canWrite = false; break; } } - variableDto.setAffectedComponents(affectedComponentDtos); + variableDto.setAffectedComponents(affectedComponentEntities); final VariableEntity variableEntity = new VariableEntity(); variableEntity.setVariable(variableDto); @@ -2178,6 +2201,8 @@ public final class DtoFactory { updateSteps.add(createVariableRegistryUpdateStepDto(request.getStartProcessorsStep())); dto.setUpdateSteps(updateSteps); + dto.setAffectedComponents(new HashSet<>(request.getAffectedComponents().values())); + return dto; } @@ -2190,42 +2215,41 @@ public final class DtoFactory { } - public VariableRegistryDTO populateAffectedComponents(final VariableRegistryDTO variableRegistry, final ProcessGroup group) { + public VariableRegistryDTO populateAffectedComponents(final VariableRegistryDTO variableRegistry, final ProcessGroup group, final RevisionManager revisionManager) { if (!group.getIdentifier().equals(variableRegistry.getProcessGroupId())) { throw new IllegalArgumentException("Variable Registry does not have the same Group ID as the given Process Group"); } final Set variableEntities = new LinkedHashSet<>(); - for (final VariableEntity inputEntity : variableRegistry.getVariables()) { - final VariableEntity entity = new VariableEntity(); + if (variableRegistry.getVariables() != null) { + for (final VariableEntity inputEntity : variableRegistry.getVariables()) { + final VariableEntity entity = new VariableEntity(); - final VariableDTO inputDto = inputEntity.getVariable(); - final VariableDTO variableDto = new VariableDTO(); - variableDto.setName(inputDto.getName()); - variableDto.setValue(inputDto.getValue()); - variableDto.setProcessGroupId(group.getIdentifier()); + final VariableDTO inputDto = inputEntity.getVariable(); + final VariableDTO variableDto = new VariableDTO(); + variableDto.setName(inputDto.getName()); + variableDto.setValue(inputDto.getValue()); + variableDto.setProcessGroupId(group.getIdentifier()); - final Set affectedComponents = group.getComponentsAffectedByVariable(variableDto.getName()); - final Set affectedComponentDtos = affectedComponents.stream() - .map(component -> createAffectedComponentDto(component)) - .collect(Collectors.toSet()); + final Set affectedComponentEntities = createAffectedComponentEntities(group.getComponentsAffectedByVariable(variableDto.getName()), revisionManager); - boolean canWrite = true; - for (final ConfiguredComponent component : affectedComponents) { - final PermissionsDTO permissions = createPermissionsDto(component); - if (!permissions.getCanRead() || !permissions.getCanWrite()) { - canWrite = false; - break; + boolean canWrite = true; + for (final AffectedComponentEntity affectedComponent : affectedComponentEntities) { + final PermissionsDTO permissions = affectedComponent.getPermissions(); + if (!permissions.getCanRead() || !permissions.getCanWrite()) { + canWrite = false; + break; + } } + + variableDto.setAffectedComponents(affectedComponentEntities); + + entity.setCanWrite(canWrite); + entity.setVariable(inputDto); + + variableEntities.add(entity); } - - variableDto.setAffectedComponents(affectedComponentDtos); - - entity.setCanWrite(canWrite); - entity.setVariable(inputDto); - - variableEntities.add(entity); } final VariableRegistryDTO registryDto = new VariableRegistryDTO(); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/EntityFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/EntityFactory.java index a7f370a947..16781c6a24 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/EntityFactory.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/EntityFactory.java @@ -33,6 +33,7 @@ import org.apache.nifi.web.api.dto.status.StatusHistoryDTO; import org.apache.nifi.web.api.entity.AccessPolicyEntity; import org.apache.nifi.web.api.entity.AccessPolicySummaryEntity; import org.apache.nifi.web.api.entity.ActionEntity; +import org.apache.nifi.web.api.entity.AffectedComponentEntity; import org.apache.nifi.web.api.entity.AllowableValueEntity; import org.apache.nifi.web.api.entity.BulletinEntity; import org.apache.nifi.web.api.entity.ComponentReferenceEntity; @@ -311,6 +312,20 @@ public final class EntityFactory { return entity; } + public AffectedComponentEntity createAffectedComponentEntity(final AffectedComponentDTO dto, final RevisionDTO revision, final PermissionsDTO permissions) { + final AffectedComponentEntity entity = new AffectedComponentEntity(); + entity.setRevision(revision); + if (dto != null) { + entity.setPermissions(permissions); + entity.setId(dto.getId()); + + if (permissions != null && permissions.getCanRead()) { + entity.setComponent(dto); + } + } + return entity; + } + public UserGroupEntity createUserGroupEntity(final UserGroupDTO dto, final RevisionDTO revision, final PermissionsDTO permissions) { final UserGroupEntity entity = new UserGroupEntity(); entity.setRevision(revision); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ProcessorDAO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ProcessorDAO.java index 1d881613fd..a1bf170128 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ProcessorDAO.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ProcessorDAO.java @@ -59,9 +59,10 @@ public interface ProcessorDAO { * Gets all the Processor transfer objects for this controller. * * @param groupId group id + * @param includeDescendants if processors from descendant groups should be included * @return List of all the Processors */ - Set getProcessors(String groupId); + Set getProcessors(String groupId, boolean includeDescendants); /** * Verifies the specified processor can be updated. diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessorDAO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessorDAO.java index e11f9ad06e..429592c51d 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessorDAO.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessorDAO.java @@ -58,6 +58,7 @@ import java.util.Set; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; import java.util.regex.Matcher; +import java.util.stream.Collectors; public class StandardProcessorDAO extends ComponentDAO implements ProcessorDAO { @@ -307,9 +308,13 @@ public class StandardProcessorDAO extends ComponentDAO implements ProcessorDAO { } @Override - public Set getProcessors(String groupId) { + public Set getProcessors(String groupId, boolean includeDescendants) { ProcessGroup group = locateProcessGroup(flowController, groupId); - return group.getProcessors(); + if (includeDescendants) { + return group.findAllProcessors().stream().collect(Collectors.toSet()); + } else { + return group.getProcessors(); + } } @Override diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/pom.xml b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/pom.xml index 0d60df2e3c..72cd5edbea 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/pom.xml +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/pom.xml @@ -461,6 +461,7 @@ ${staging.dir}/js/nf/canvas/nf-port-configuration.js ${staging.dir}/js/nf/canvas/nf-port-details.js ${staging.dir}/js/nf/canvas/nf-process-group-configuration.js + ${staging.dir}/js/nf/canvas/nf-variable-registry.js ${staging.dir}/js/nf/canvas/nf-component-version.js ${staging.dir}/js/nf/canvas/nf-remote-process-group-configuration.js ${staging.dir}/js/nf/canvas/nf-remote-process-group-details.js diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/resources/filters/canvas.properties b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/resources/filters/canvas.properties index a3ab7fc465..9b04ecec9e 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/resources/filters/canvas.properties +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/resources/filters/canvas.properties @@ -43,6 +43,7 @@ nf.canvas.script.tags=\n\ \n\ \n\ +\n\ \n\ \n\ \n\ diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/WEB-INF/pages/canvas.jsp b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/WEB-INF/pages/canvas.jsp index 1928f7a695..3c7d407bc8 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/WEB-INF/pages/canvas.jsp +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/WEB-INF/pages/canvas.jsp @@ -129,6 +129,7 @@ + diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/WEB-INF/partials/canvas/variable-configuration.jsp b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/WEB-INF/partials/canvas/variable-configuration.jsp new file mode 100644 index 0000000000..f26c2ebbdc --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/WEB-INF/partials/canvas/variable-configuration.jsp @@ -0,0 +1,93 @@ +<%-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--%> +<%@ page contentType="text/html" pageEncoding="UTF-8" session="false" %> + + \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/css/controller-service.css b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/css/controller-service.css index 49b7376eaa..fe5808e26d 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/css/controller-service.css +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/css/controller-service.css @@ -79,6 +79,10 @@ ul.referencing-component-listing li { white-space: nowrap; } +div.referencing-component-state { + width: 13px; +} + div.referencing-component-state.disabled:before { content: '\e802'; font-family: flowfont; diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/css/dialog.css b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/css/dialog.css index 1e01919cbb..cb5282ba81 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/css/dialog.css +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/css/dialog.css @@ -212,6 +212,45 @@ div.progress-label { height: 575px; } +/* + Variable Registry + */ + +#variable-registry-dialog { + width: 850px; + height: 575px; +} + +#variable-registry-dialog div.settings-left { + float: left; + width: 65%; +} + +#variable-registry-dialog div.settings-right { + float: left; + width: 33%; +} + +#variable-registry-table { + height: 400px; +} + +#add-variable { + float: right; + margin-bottom: 4px; + font-size: 16px; + text-transform: uppercase; +} + +li.affected-component-container { + margin-bottom: 3px; + height: 16px; +} + +div.slick-cell div.overridden { + text-decoration: line-through; +} + /* General dialog styles. */ @@ -239,3 +278,15 @@ ul.result li { float: left; width: 2%; } + +div.variable-step { + width: 16px; + height: 16px; + background-color: transparent; + float: right; +} + +#variable-update-steps li { + width: 300px; + margin-bottom: 2px; +} diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/jquery/propertytable/jquery.propertytable.js b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/jquery/propertytable/jquery.propertytable.js index d1006ab6c1..abd27b2bd0 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/jquery/propertytable/jquery.propertytable.js +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/jquery/propertytable/jquery.propertytable.js @@ -1682,14 +1682,14 @@ // build the new property dialog var newPropertyDialogMarkup = ''; var newPropertyDialog = $(newPropertyDialogMarkup).appendTo(options.dialogContainer); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-actions.js b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-actions.js index 7050df0aa3..90a1140616 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-actions.js +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-actions.js @@ -31,6 +31,7 @@ 'nf.GoTo', 'nf.ng.Bridge', 'nf.Shell', + 'nf.VariableRegistry', 'nf.ComponentState', 'nf.Draggable', 'nf.Birdseye', @@ -54,8 +55,8 @@ 'nf.ComponentVersion', 'nf.QueueListing', 'nf.StatusHistory'], - function ($, d3, nfCanvasUtils, nfCommon, nfDialog, nfClient, nfErrorHandler, nfClipboard, nfSnippet, nfGoto, nfNgBridge, nfShell, nfComponentState, nfDraggable, nfBirdseye, nfConnection, nfGraph, nfProcessGroupConfiguration, nfProcessorConfiguration, nfProcessorDetails, nfLabelConfiguration, nfRemoteProcessGroupConfiguration, nfRemoteProcessGroupDetails, nfPortConfiguration, nfPortDetails, nfConnectionConfiguration, nfConnectionDetails, nfPolicyManagement, nfRemoteProcessGroup, nfLabel, nfProcessor, nfRemoteProcessGroupPorts, nfComponentVersion, nfQueueListing, nfStatusHistory) { - return (nf.Actions = factory($, d3, nfCanvasUtils, nfCommon, nfDialog, nfClient, nfErrorHandler, nfClipboard, nfSnippet, nfGoto, nfNgBridge, nfShell, nfComponentState, nfDraggable, nfBirdseye, nfConnection, nfGraph, nfProcessGroupConfiguration, nfProcessorConfiguration, nfProcessorDetails, nfLabelConfiguration, nfRemoteProcessGroupConfiguration, nfRemoteProcessGroupDetails, nfPortConfiguration, nfPortDetails, nfConnectionConfiguration, nfConnectionDetails, nfPolicyManagement, nfRemoteProcessGroup, nfLabel, nfProcessor, nfRemoteProcessGroupPorts, nfComponentVersion, nfQueueListing, nfStatusHistory)); + function ($, d3, nfCanvasUtils, nfCommon, nfDialog, nfClient, nfErrorHandler, nfClipboard, nfSnippet, nfGoto, nfNgBridge, nfShell, nfVariableRegistry, nfComponentState, nfDraggable, nfBirdseye, nfConnection, nfGraph, nfProcessGroupConfiguration, nfProcessorConfiguration, nfProcessorDetails, nfLabelConfiguration, nfRemoteProcessGroupConfiguration, nfRemoteProcessGroupDetails, nfPortConfiguration, nfPortDetails, nfConnectionConfiguration, nfConnectionDetails, nfPolicyManagement, nfRemoteProcessGroup, nfLabel, nfProcessor, nfRemoteProcessGroupPorts, nfComponentVersion, nfQueueListing, nfStatusHistory) { + return (nf.Actions = factory($, d3, nfCanvasUtils, nfCommon, nfDialog, nfClient, nfErrorHandler, nfClipboard, nfSnippet, nfGoto, nfNgBridge, nfShell, nfVariableRegistry, nfComponentState, nfDraggable, nfBirdseye, nfConnection, nfGraph, nfProcessGroupConfiguration, nfProcessorConfiguration, nfProcessorDetails, nfLabelConfiguration, nfRemoteProcessGroupConfiguration, nfRemoteProcessGroupDetails, nfPortConfiguration, nfPortDetails, nfConnectionConfiguration, nfConnectionDetails, nfPolicyManagement, nfRemoteProcessGroup, nfLabel, nfProcessor, nfRemoteProcessGroupPorts, nfComponentVersion, nfQueueListing, nfStatusHistory)); }); } else if (typeof exports === 'object' && typeof module === 'object') { module.exports = (nf.Actions = @@ -71,6 +72,7 @@ require('nf.GoTo'), require('nf.ng.Bridge'), require('nf.Shell'), + require('nf.VariableRegistry'), require('nf.ComponentState'), require('nf.Draggable'), require('nf.Birdseye'), @@ -107,6 +109,7 @@ root.nf.GoTo, root.nf.ng.Bridge, root.nf.Shell, + root.nf.VariableRegistry, root.nf.ComponentState, root.nf.Draggable, root.nf.Birdseye, @@ -131,7 +134,7 @@ root.nf.QueueListing, root.nf.StatusHistory); } -}(this, function ($, d3, nfCanvasUtils, nfCommon, nfDialog, nfClient, nfErrorHandler, nfClipboard, nfSnippet, nfGoto, nfNgBridge, nfShell, nfComponentState, nfDraggable, nfBirdseye, nfConnection, nfGraph, nfProcessGroupConfiguration, nfProcessorConfiguration, nfProcessorDetails, nfLabelConfiguration, nfRemoteProcessGroupConfiguration, nfRemoteProcessGroupDetails, nfPortConfiguration, nfPortDetails, nfConnectionConfiguration, nfConnectionDetails, nfPolicyManagement, nfRemoteProcessGroup, nfLabel, nfProcessor, nfRemoteProcessGroupPorts, nfComponentVersion, nfQueueListing, nfStatusHistory) { +}(this, function ($, d3, nfCanvasUtils, nfCommon, nfDialog, nfClient, nfErrorHandler, nfClipboard, nfSnippet, nfGoto, nfNgBridge, nfShell, nfVariableRegistry, nfComponentState, nfDraggable, nfBirdseye, nfConnection, nfGraph, nfProcessGroupConfiguration, nfProcessorConfiguration, nfProcessorDetails, nfLabelConfiguration, nfRemoteProcessGroupConfiguration, nfRemoteProcessGroupDetails, nfPortConfiguration, nfPortDetails, nfConnectionConfiguration, nfConnectionDetails, nfPolicyManagement, nfRemoteProcessGroup, nfLabel, nfProcessor, nfRemoteProcessGroupPorts, nfComponentVersion, nfQueueListing, nfStatusHistory) { 'use strict'; var config = { @@ -1233,6 +1236,22 @@ nfComponentState.showState(processor, nfCanvasUtils.isConfigurable(selection)); }, + /** + * Opens the variable registry for the specified selection of the current group if the selection is emtpy. + * + * @param {selection} selection + */ + openVariableRegistry: function (selection) { + if (selection.empty()) { + nfVariableRegistry.showVariables(nfCanvasUtils.getGroupId()); + } else if (selection.size() === 1) { + var selectionData = selection.datum(); + if (nfCanvasUtils.isProcessGroup(selection)) { + nfVariableRegistry.showVariables(selectionData.id); + } + } + }, + /** * Views the state for the specified processor. * diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-canvas-bootstrap.js b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-canvas-bootstrap.js index 77ef67a3a0..e3ab5c966b 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-canvas-bootstrap.js +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-canvas-bootstrap.js @@ -37,6 +37,7 @@ 'nf.Snippet', 'nf.Actions', 'nf.QueueListing', + 'nf.VariableRegistry', 'nf.ComponentState', 'nf.ComponentVersion', 'nf.Draggable', @@ -81,8 +82,8 @@ 'nf.ng.Canvas.OperateCtrl', 'nf.ng.BreadcrumbsDirective', 'nf.ng.DraggableDirective'], - function ($, angular, nfCommon, nfCanvasUtils, nfErrorHandler, nfClient, nfClusterSummary, nfDialog, nfStorage, nfCanvas, nfGraph, nfContextMenu, nfQuickSelect, nfShell, nfSettings, nfActions, nfSnippet, nfQueueListing, nfComponentState, nfComponentVersion, nfDraggable, nfConnectable, nfStatusHistory, nfBirdseye, nfConnectionConfiguration, nfControllerService, nfReportingTask, nfPolicyManagement, nfProcessorConfiguration, nfProcessGroupConfiguration, nfControllerServices, nfRemoteProcessGroupConfiguration, nfRemoteProcessGroupPorts, nfPortConfiguration, nfLabelConfiguration, nfProcessorDetails, nfPortDetails, nfConnectionDetails, nfRemoteProcessGroupDetails, nfGoto, nfNgBridge, appCtrl, appConfig, serviceProvider, breadcrumbsCtrl, headerCtrl, flowStatusCtrl, globalMenuCtrl, toolboxCtrl, processorComponent, inputPortComponent, outputPortComponent, processGroupComponent, remoteProcessGroupComponent, funnelComponent, templateComponent, labelComponent, graphControlsCtrl, navigateCtrl, operateCtrl, breadcrumbsDirective, draggableDirective) { - return factory($, angular, nfCommon, nfCanvasUtils, nfErrorHandler, nfClient, nfClusterSummary, nfDialog, nfStorage, nfCanvas, nfGraph, nfContextMenu, nfQuickSelect, nfShell, nfSettings, nfActions, nfSnippet, nfQueueListing, nfComponentState, nfComponentVersion, nfDraggable, nfConnectable, nfStatusHistory, nfBirdseye, nfConnectionConfiguration, nfControllerService, nfReportingTask, nfPolicyManagement, nfProcessorConfiguration, nfProcessGroupConfiguration, nfControllerServices, nfRemoteProcessGroupConfiguration, nfRemoteProcessGroupPorts, nfPortConfiguration, nfLabelConfiguration, nfProcessorDetails, nfPortDetails, nfConnectionDetails, nfRemoteProcessGroupDetails, nfGoto, nfNgBridge, appCtrl, appConfig, serviceProvider, breadcrumbsCtrl, headerCtrl, flowStatusCtrl, globalMenuCtrl, toolboxCtrl, processorComponent, inputPortComponent, outputPortComponent, processGroupComponent, remoteProcessGroupComponent, funnelComponent, templateComponent, labelComponent, graphControlsCtrl, navigateCtrl, operateCtrl, breadcrumbsDirective, draggableDirective); + function ($, angular, nfCommon, nfCanvasUtils, nfErrorHandler, nfClient, nfClusterSummary, nfDialog, nfStorage, nfCanvas, nfGraph, nfContextMenu, nfQuickSelect, nfShell, nfSettings, nfActions, nfSnippet, nfQueueListing, nfVariableRegistry, nfComponentState, nfComponentVersion, nfDraggable, nfConnectable, nfStatusHistory, nfBirdseye, nfConnectionConfiguration, nfControllerService, nfReportingTask, nfPolicyManagement, nfProcessorConfiguration, nfProcessGroupConfiguration, nfControllerServices, nfRemoteProcessGroupConfiguration, nfRemoteProcessGroupPorts, nfPortConfiguration, nfLabelConfiguration, nfProcessorDetails, nfPortDetails, nfConnectionDetails, nfRemoteProcessGroupDetails, nfGoto, nfNgBridge, appCtrl, appConfig, serviceProvider, breadcrumbsCtrl, headerCtrl, flowStatusCtrl, globalMenuCtrl, toolboxCtrl, processorComponent, inputPortComponent, outputPortComponent, processGroupComponent, remoteProcessGroupComponent, funnelComponent, templateComponent, labelComponent, graphControlsCtrl, navigateCtrl, operateCtrl, breadcrumbsDirective, draggableDirective) { + return factory($, angular, nfCommon, nfCanvasUtils, nfErrorHandler, nfClient, nfClusterSummary, nfDialog, nfStorage, nfCanvas, nfGraph, nfContextMenu, nfQuickSelect, nfShell, nfSettings, nfActions, nfSnippet, nfQueueListing, nfVariableRegistry, nfComponentState, nfComponentVersion, nfDraggable, nfConnectable, nfStatusHistory, nfBirdseye, nfConnectionConfiguration, nfControllerService, nfReportingTask, nfPolicyManagement, nfProcessorConfiguration, nfProcessGroupConfiguration, nfControllerServices, nfRemoteProcessGroupConfiguration, nfRemoteProcessGroupPorts, nfPortConfiguration, nfLabelConfiguration, nfProcessorDetails, nfPortDetails, nfConnectionDetails, nfRemoteProcessGroupDetails, nfGoto, nfNgBridge, appCtrl, appConfig, serviceProvider, breadcrumbsCtrl, headerCtrl, flowStatusCtrl, globalMenuCtrl, toolboxCtrl, processorComponent, inputPortComponent, outputPortComponent, processGroupComponent, remoteProcessGroupComponent, funnelComponent, templateComponent, labelComponent, graphControlsCtrl, navigateCtrl, operateCtrl, breadcrumbsDirective, draggableDirective); }); } else if (typeof exports === 'object' && typeof module === 'object') { module.exports = factory(require('jquery'), @@ -103,6 +104,7 @@ require('nf.Actions'), require('nf.Snippet'), require('nf.QueueListing'), + require('nf.VariableRegistry'), require('nf.ComponentState'), require('nf.ComponentVersion'), require('nf.Draggable'), @@ -166,6 +168,7 @@ root.nf.Actions, root.nf.Snippet, root.nf.QueueListing, + root.nf.VariableRegistry, root.nf.ComponentState, root.nf.ComponentVersion, root.nf.Draggable, @@ -211,7 +214,7 @@ root.nf.ng.BreadcrumbsDirective, root.nf.ng.DraggableDirective); } -}(this, function ($, angular, nfCommon, nfCanvasUtils, nfErrorHandler, nfClient, nfClusterSummary, nfDialog, nfStorage, nfCanvas, nfGraph, nfContextMenu, nfQuickSelect, nfShell, nfSettings, nfActions, nfSnippet, nfQueueListing, nfComponentState, nfComponentVersion, nfDraggable, nfConnectable, nfStatusHistory, nfBirdseye, nfConnectionConfiguration, nfControllerService, nfReportingTask, nfPolicyManagement, nfProcessorConfiguration, nfProcessGroupConfiguration, nfControllerServices, nfRemoteProcessGroupConfiguration, nfRemoteProcessGroupPorts, nfPortConfiguration, nfLabelConfiguration, nfProcessorDetails, nfPortDetails, nfConnectionDetails, nfRemoteProcessGroupDetails, nfGoto, nfNgBridge, appCtrl, appConfig, serviceProvider, breadcrumbsCtrl, headerCtrl, flowStatusCtrl, globalMenuCtrl, toolboxCtrl, processorComponent, inputPortComponent, outputPortComponent, processGroupComponent, remoteProcessGroupComponent, funnelComponent, templateComponent, labelComponent, graphControlsCtrl, navigateCtrl, operateCtrl, breadcrumbsDirective, draggableDirective) { +}(this, function ($, angular, nfCommon, nfCanvasUtils, nfErrorHandler, nfClient, nfClusterSummary, nfDialog, nfStorage, nfCanvas, nfGraph, nfContextMenu, nfQuickSelect, nfShell, nfSettings, nfActions, nfSnippet, nfQueueListing, nfVariableRegistry, nfComponentState, nfComponentVersion, nfDraggable, nfConnectable, nfStatusHistory, nfBirdseye, nfConnectionConfiguration, nfControllerService, nfReportingTask, nfPolicyManagement, nfProcessorConfiguration, nfProcessGroupConfiguration, nfControllerServices, nfRemoteProcessGroupConfiguration, nfRemoteProcessGroupPorts, nfPortConfiguration, nfLabelConfiguration, nfProcessorDetails, nfPortDetails, nfConnectionDetails, nfRemoteProcessGroupDetails, nfGoto, nfNgBridge, appCtrl, appConfig, serviceProvider, breadcrumbsCtrl, headerCtrl, flowStatusCtrl, globalMenuCtrl, toolboxCtrl, processorComponent, inputPortComponent, outputPortComponent, processGroupComponent, remoteProcessGroupComponent, funnelComponent, templateComponent, labelComponent, graphControlsCtrl, navigateCtrl, operateCtrl, breadcrumbsDirective, draggableDirective) { var config = { urls: { @@ -347,6 +350,7 @@ nfSettings.init(); nfActions.init(); nfQueueListing.init(); + nfVariableRegistry.init(); nfComponentState.init(); nfComponentVersion.init(nfSettings); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-canvas-utils.js b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-canvas-utils.js index 54f1d14a13..4ead97b8dc 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-canvas-utils.js +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-canvas-utils.js @@ -238,6 +238,25 @@ } }, + /** + * Queries for bulletins for the specified components. + * + * @param {array} componentIds + * @returns {deferred} + */ + queryBulletins: function (componentIds) { + var ids = componentIds.join('|'); + + return $.ajax({ + type: 'GET', + url: '../nifi-api/flow/bulletin-board', + data: { + sourceId: ids + }, + dataType: 'json' + }).fail(nfErrorHandler.handleAjaxError); + }, + /** * Shows the specified component in the specified group. * @@ -282,6 +301,8 @@ }); } }); + + return refreshGraph; } }, diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-context-menu.js b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-context-menu.js index 65346b2b85..0ebcf6a59f 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-context-menu.js +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-context-menu.js @@ -64,6 +64,15 @@ return nfCanvasUtils.isConfigurable(selection); }; + /** + * Determines whether the component in the specified selection has variables. + * + * @param {selection} selection The selection of currently selected components + */ + var hasVariables = function (selection) { + return selection.empty() || nfCanvasUtils.isProcessGroup(selection); + }; + /** * Determines whether the component in the specified selection has configuration details. * @@ -537,6 +546,7 @@ {separator: true}, {id: 'show-configuration-menu-item', condition: isConfigurable, menuItem: {clazz: 'fa fa-gear', text: 'Configure', action: 'showConfiguration'}}, {id: 'show-details-menu-item', condition: hasDetails, menuItem: {clazz: 'fa fa-gear', text: 'View configuration', action: 'showDetails'}}, + {id: 'variable-registry-menu-item', condition: hasVariables, menuItem: {clazz: 'fa', text: 'Variables', action: 'openVariableRegistry'}}, {separator: true}, {id: 'enter-group-menu-item', condition: isProcessGroup, menuItem: {clazz: 'fa fa-sign-in', text: 'Enter group', action: 'enterGroup'}}, {separator: true}, diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-controller-service.js b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-controller-service.js index 90324bf7cf..c7d9c435dc 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-controller-service.js +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-controller-service.js @@ -586,7 +586,7 @@ }); // query for the bulletins - queryBulletins(referencingComponentIds).done(function (response) { + nfCanvasUtils.queryBulletins(referencingComponentIds).done(function (response) { var bulletins = response.bulletinBoard.bulletins; updateReferencingComponentBulletins(bulletins); }); @@ -622,25 +622,6 @@ createReferenceBlock('Unauthorized', unauthorized); }; - /** - * Queries for bulletins for the specified components. - * - * @param {array} componentIds - * @returns {deferred} - */ - var queryBulletins = function (componentIds) { - var ids = componentIds.join('|'); - - return $.ajax({ - type: 'GET', - url: '../nifi-api/flow/bulletin-board', - data: { - sourceId: ids - }, - dataType: 'json' - }).fail(nfErrorHandler.handleAjaxError); - }; - /** * Sets whether the specified controller service is enabled. * @@ -688,7 +669,7 @@ return service.state === 'DISABLED'; } }, function (service) { - return queryBulletins([service.id]); + return nfCanvasUtils.queryBulletins([service.id]); }, pollCondition); // once the service has updated, resolve and render the updated service @@ -961,7 +942,7 @@ } }); - return queryBulletins(referencingSchedulableComponents); + return nfCanvasUtils.queryBulletins(referencingSchedulableComponents); }, pollCondition); }; @@ -1006,7 +987,7 @@ } }); - return queryBulletins(referencingSchedulableComponents); + return nfCanvasUtils.queryBulletins(referencingSchedulableComponents); }, pollCondition); }; @@ -1051,7 +1032,7 @@ } }); - return queryBulletins(referencingSchedulableComponents); + return nfCanvasUtils.queryBulletins(referencingSchedulableComponents); }, pollCondition); }; @@ -1164,7 +1145,7 @@ $('#disable-controller-service-dialog').modal('setButtonModel', buttons).modal('show'); // load the bulletins - queryBulletins([controllerService.id]).done(function (response) { + nfCanvasUtils.queryBulletins([controllerService.id]).done(function (response) { updateBulletins(response.bulletinBoard.bulletins, $('#disable-controller-service-bulletins')); }); @@ -1216,7 +1197,7 @@ $('#enable-controller-service-dialog').modal('setButtonModel', buttons).modal('show'); // load the bulletins - queryBulletins([controllerService.id]).done(function (response) { + nfCanvasUtils.queryBulletins([controllerService.id]).done(function (response) { updateBulletins(response.bulletinBoard.bulletins, $('#enable-controller-service-bulletins')); }); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-variable-registry.js b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-variable-registry.js new file mode 100644 index 0000000000..47b77ac851 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-variable-registry.js @@ -0,0 +1,1633 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* global define, module, require, exports */ + +/** + * Opens the variable registry for a given Process Group. + */ +(function (root, factory) { + if (typeof define === 'function' && define.amd) { + define(['jquery', + 'd3', + 'Slick', + 'nf.Canvas', + 'nf.CanvasUtils', + 'nf.ErrorHandler', + 'nf.Dialog', + 'nf.Client', + 'nf.Common', + 'nf.ng.Bridge', + 'nf.Processor', + 'nf.ProcessGroup', + 'nf.ProcessGroupConfiguration'], + function ($, d3, Slick, nfCanvas, nfCanvasUtils, nfErrorHandler, nfDialog, nfClient, nfCommon, nfNgBridge, nfProcessor, nfProcessGroup, nfProcessGroupConfiguration) { + return (nf.ComponentState = factory($, d3, Slick, nfCanvas, nfCanvasUtils, nfErrorHandler, nfDialog, nfClient, nfCommon, nfNgBridge, nfProcessor, nfProcessGroup, nfProcessGroupConfiguration)); + }); + } else if (typeof exports === 'object' && typeof module === 'object') { + module.exports = (nf.ComponentState = + factory(require('jquery'), + require('d3'), + require('Slick'), + require('nf.Canvas'), + require('nf.CanvasUtils'), + require('nf.ErrorHandler'), + require('nf.Dialog'), + require('nf.Client'), + require('nf.Common'), + require('nf.ng.Bridge'), + require('nf.Processor'), + require('nf.ProcessGroup'), + require('nf.ProcessGroupConfiguration'))); + } else { + nf.VariableRegistry = factory(root.$, + root.d3, + root.Slick, + root.nf.Canvas, + root.nf.CanvasUtils, + root.nf.ErrorHandler, + root.nf.Dialog, + root.nf.Client, + root.nf.Common, + root.nf.ng.Bridge, + root.nf.Processor, + root.nf.ProcessGroup, + root.nf.ProcessGroupConfiguration); + } +}(this, function ($, d3, Slick, nfCanvas, nfCanvasUtils, nfErrorHandler, nfDialog, nfClient, nfCommon, nfNgBridge, nfProcessor, nfProcessGroup, nfProcessGroupConfiguration) { + 'use strict'; + + // text editor + var textEditor = function (args) { + var scope = this; + var initialValue = ''; + var previousValue; + var wrapper; + var isEmpty; + var input; + + this.init = function () { + var container = $('body'); + + // record the previous value + previousValue = args.item[args.column.field]; + + // create the wrapper + wrapper = $('
').addClass('slickgrid-editor').css({ + 'z-index': 100000, + 'position': 'absolute', + 'border-radius': '2px', + 'box-shadow': 'rgba(0, 0, 0, 0.247059) 0px 2px 5px', + 'background-color': 'rgb(255, 255, 255)', + 'overflow': 'hidden', + 'padding': '10px 20px', + 'cursor': 'move', + 'transform': 'translate3d(0px, 0px, 0px)' + }).appendTo(container); + + // create the input field + input = $('