mirror of https://github.com/apache/nifi.git
NIFI-4436:
- Updating front end to use version control state/status. - Fixing copy/paste issue during revert local changes. - Code clean up in the breadcrumbs. - Update VersionsResource authorization and two phase commit object usage.
This commit is contained in:
parent
fdef5b5605
commit
49aad2c3a8
|
@ -0,0 +1,36 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.nifi.web.api.entity;
|
||||
|
||||
import io.swagger.annotations.ApiModelProperty;
|
||||
|
||||
import javax.xml.bind.annotation.XmlRootElement;
|
||||
|
||||
@XmlRootElement(name = "createActiveRequest")
|
||||
public class CreateActiveRequestEntity extends Entity {
|
||||
private String processGroupId;
|
||||
|
||||
@ApiModelProperty("The Process Group ID that this active request will update")
|
||||
public String getProcessGroupId() {
|
||||
return processGroupId;
|
||||
}
|
||||
|
||||
public void setProcessGroupId(String processGroupId) {
|
||||
this.processGroupId = processGroupId;
|
||||
}
|
||||
}
|
|
@ -17,23 +17,14 @@
|
|||
|
||||
package org.apache.nifi.web.api;
|
||||
|
||||
import javax.ws.rs.Consumes;
|
||||
import javax.ws.rs.DELETE;
|
||||
import javax.ws.rs.DefaultValue;
|
||||
import javax.ws.rs.GET;
|
||||
import javax.ws.rs.HttpMethod;
|
||||
import javax.ws.rs.POST;
|
||||
import javax.ws.rs.PUT;
|
||||
import javax.ws.rs.Path;
|
||||
import javax.ws.rs.PathParam;
|
||||
import javax.ws.rs.Produces;
|
||||
import javax.ws.rs.QueryParam;
|
||||
import javax.ws.rs.core.MediaType;
|
||||
import javax.ws.rs.core.MultivaluedHashMap;
|
||||
import javax.ws.rs.core.Response;
|
||||
import javax.ws.rs.core.Response.Status;
|
||||
|
||||
import io.swagger.annotations.Api;
|
||||
import io.swagger.annotations.ApiOperation;
|
||||
import io.swagger.annotations.ApiParam;
|
||||
import io.swagger.annotations.ApiResponse;
|
||||
import io.swagger.annotations.ApiResponses;
|
||||
import io.swagger.annotations.Authorization;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.nifi.authorization.AccessDeniedException;
|
||||
import org.apache.nifi.authorization.Authorizer;
|
||||
import org.apache.nifi.authorization.ProcessGroupAuthorizable;
|
||||
import org.apache.nifi.authorization.RequestAction;
|
||||
|
@ -65,6 +56,7 @@ import org.apache.nifi.web.api.dto.VersionControlInformationDTO;
|
|||
import org.apache.nifi.web.api.dto.VersionedFlowDTO;
|
||||
import org.apache.nifi.web.api.dto.VersionedFlowUpdateRequestDTO;
|
||||
import org.apache.nifi.web.api.entity.AffectedComponentEntity;
|
||||
import org.apache.nifi.web.api.entity.CreateActiveRequestEntity;
|
||||
import org.apache.nifi.web.api.entity.ProcessGroupEntity;
|
||||
import org.apache.nifi.web.api.entity.StartVersionControlRequestEntity;
|
||||
import org.apache.nifi.web.api.entity.VersionControlComponentMappingEntity;
|
||||
|
@ -80,6 +72,21 @@ import org.apache.nifi.web.util.LifecycleManagementException;
|
|||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import javax.ws.rs.Consumes;
|
||||
import javax.ws.rs.DELETE;
|
||||
import javax.ws.rs.DefaultValue;
|
||||
import javax.ws.rs.GET;
|
||||
import javax.ws.rs.HttpMethod;
|
||||
import javax.ws.rs.POST;
|
||||
import javax.ws.rs.PUT;
|
||||
import javax.ws.rs.Path;
|
||||
import javax.ws.rs.PathParam;
|
||||
import javax.ws.rs.Produces;
|
||||
import javax.ws.rs.QueryParam;
|
||||
import javax.ws.rs.core.MediaType;
|
||||
import javax.ws.rs.core.MultivaluedHashMap;
|
||||
import javax.ws.rs.core.Response;
|
||||
import javax.ws.rs.core.Response.Status;
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
import java.net.URISyntaxException;
|
||||
|
@ -95,13 +102,6 @@ import java.util.concurrent.TimeUnit;
|
|||
import java.util.function.Consumer;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import io.swagger.annotations.Api;
|
||||
import io.swagger.annotations.ApiOperation;
|
||||
import io.swagger.annotations.ApiParam;
|
||||
import io.swagger.annotations.ApiResponse;
|
||||
import io.swagger.annotations.ApiResponses;
|
||||
import io.swagger.annotations.Authorization;
|
||||
|
||||
@Path("/versions")
|
||||
@Api(value = "/versions", description = "Endpoint for managing version control for a flow")
|
||||
public class VersionsResource extends ApplicationResource {
|
||||
|
@ -168,7 +168,7 @@ public class VersionsResource extends ApplicationResource {
|
|||
@POST
|
||||
@Consumes(MediaType.WILDCARD)
|
||||
@Produces(MediaType.APPLICATION_JSON)
|
||||
@Path("start-requests")
|
||||
@Path("active-requests")
|
||||
@ApiOperation(
|
||||
value = "Creates a request so that a Process Group can be placed under Version Control or have its Version Control configuration changed. Creating this request will "
|
||||
+ "prevent any other threads from simultaneously saving local changes to Version Control. It will not, however, actually save the local flow to the Flow Registry. A "
|
||||
|
@ -182,20 +182,28 @@ public class VersionsResource extends ApplicationResource {
|
|||
@ApiResponse(code = 404, message = "The specified resource could not be found."),
|
||||
@ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
|
||||
})
|
||||
public Response createVersionControlRequest() throws InterruptedException {
|
||||
public Response createVersionControlRequest(
|
||||
@ApiParam(value = "The versioned flow details.", required = true) final CreateActiveRequestEntity requestEntity) throws InterruptedException {
|
||||
|
||||
if (isReplicateRequest()) {
|
||||
return replicate(HttpMethod.POST);
|
||||
}
|
||||
|
||||
if (requestEntity.getProcessGroupId() == null) {
|
||||
throw new IllegalArgumentException("The id of the process group that will be updated must be specified.");
|
||||
}
|
||||
|
||||
final NiFiUser user = NiFiUserUtils.getNiFiUser();
|
||||
|
||||
return withWriteLock(
|
||||
serviceFacade,
|
||||
/* entity */ null,
|
||||
requestEntity,
|
||||
lookup -> {
|
||||
// TODO - pass in PG ID to authorize
|
||||
final Authorizable processGroup = lookup.getProcessGroup(requestEntity.getProcessGroupId()).getAuthorizable();
|
||||
processGroup.authorize(authorizer, RequestAction.WRITE, user);
|
||||
},
|
||||
/* verifier */ null,
|
||||
requestEntity -> {
|
||||
entity -> {
|
||||
final String requestId = generateUuid();
|
||||
|
||||
// We need to ensure that only a single Version Control Request can occur throughout the flow.
|
||||
|
@ -204,7 +212,7 @@ public class VersionsResource extends ApplicationResource {
|
|||
// As a result, may could end up in a situation where we are creating flows in the registry that are never referenced.
|
||||
synchronized (activeRequestMonitor) {
|
||||
if (activeRequest == null || activeRequest.isExpired()) {
|
||||
activeRequest = new ActiveRequest(requestId);
|
||||
activeRequest = new ActiveRequest(requestId, user, entity.getProcessGroupId());
|
||||
} else {
|
||||
throw new IllegalStateException("A request is already underway to place a Process Group in this NiFi instance under Version Control. "
|
||||
+ "Only a single such request is allowed to occurred at a time. Please try the request again momentarily.");
|
||||
|
@ -219,7 +227,7 @@ public class VersionsResource extends ApplicationResource {
|
|||
@PUT
|
||||
@Consumes(MediaType.APPLICATION_JSON)
|
||||
@Produces(MediaType.APPLICATION_JSON)
|
||||
@Path("start-requests/{id}")
|
||||
@Path("active-requests/{id}")
|
||||
@ApiOperation(
|
||||
value = "Updates the request with the given ID",
|
||||
response = VersionControlInformationEntity.class,
|
||||
|
@ -235,7 +243,7 @@ public class VersionsResource extends ApplicationResource {
|
|||
@ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
|
||||
})
|
||||
public Response updateVersionControlRequest(@ApiParam("The request ID.") @PathParam("id") final String requestId,
|
||||
@ApiParam(value = "The controller service configuration details.", required = true) final VersionControlComponentMappingEntity requestEntity) {
|
||||
@ApiParam(value = "The version control component mapping.", required = true) final VersionControlComponentMappingEntity requestEntity) {
|
||||
|
||||
// Verify request
|
||||
final RevisionDTO revisionDto = requestEntity.getProcessGroupRevision();
|
||||
|
@ -287,21 +295,40 @@ public class VersionsResource extends ApplicationResource {
|
|||
throw new IllegalStateException("Version Control Request with ID " + requestId + " has already expired");
|
||||
}
|
||||
|
||||
if (activeRequest.isUpdatePerformed()) {
|
||||
throw new IllegalStateException("Version Control Request with ID " + requestId + " has already been performed");
|
||||
}
|
||||
|
||||
final String groupId = requestEntity.getVersionControlInformation().getGroupId();
|
||||
|
||||
if (!activeRequest.getProcessGroupId().equals(groupId)) {
|
||||
throw new IllegalStateException("Version Control Request with ID " + requestId + " was created for a different process group id");
|
||||
}
|
||||
|
||||
final Revision groupRevision = new Revision(revisionDto.getVersion(), revisionDto.getClientId(), groupId);
|
||||
return withWriteLock(
|
||||
serviceFacade,
|
||||
requestEntity,
|
||||
groupRevision,
|
||||
lookup -> {
|
||||
final Authorizable processGroup = lookup.getProcessGroup(groupId).getAuthorizable();
|
||||
processGroup.authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser());
|
||||
final NiFiUser user = NiFiUserUtils.getNiFiUser();
|
||||
if (user == null) {
|
||||
throw new AccessDeniedException("Unknown user.");
|
||||
}
|
||||
|
||||
if (!user.equals(activeRequest.getUser())) {
|
||||
throw new AccessDeniedException("Only the user that creates the Version Control Request can use it.");
|
||||
}
|
||||
},
|
||||
null,
|
||||
(rev, mappingEntity) -> {
|
||||
// set the version control information
|
||||
final VersionControlInformationEntity responseEntity = serviceFacade.setVersionControlInformation(rev, groupId,
|
||||
mappingEntity.getVersionControlInformation(), mappingEntity.getVersionControlComponentMapping());
|
||||
|
||||
// indicate that the active request has performed the update
|
||||
activeRequest.updatePerformed();
|
||||
|
||||
return generateOkResponse(responseEntity).build();
|
||||
});
|
||||
}
|
||||
|
@ -311,10 +338,10 @@ public class VersionsResource extends ApplicationResource {
|
|||
@DELETE
|
||||
@Consumes(MediaType.WILDCARD)
|
||||
@Produces(MediaType.APPLICATION_JSON)
|
||||
@Path("start-requests/{id}")
|
||||
@Path("active-requests/{id}")
|
||||
@ApiOperation(
|
||||
value = "Deletes the Version Control Request with the given ID. This will allow other threads to save flows to the Flow Registry. See also the documentation "
|
||||
+ "for POSTing to /versions/start-requests for information regarding why this is done.",
|
||||
+ "for POSTing to /versions/active-requests for information regarding why this is done.",
|
||||
notes = NON_GUARANTEED_ENDPOINT)
|
||||
@ApiResponses(value = {
|
||||
@ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
|
||||
|
@ -328,13 +355,6 @@ public class VersionsResource extends ApplicationResource {
|
|||
return replicate(HttpMethod.DELETE);
|
||||
}
|
||||
|
||||
return withWriteLock(
|
||||
serviceFacade,
|
||||
null,
|
||||
lookup -> {
|
||||
},
|
||||
null,
|
||||
requestEntity -> {
|
||||
synchronized (activeRequestMonitor) {
|
||||
if (activeRequest == null) {
|
||||
throw new IllegalStateException("No Version Control Request with ID " + requestId + " is currently active");
|
||||
|
@ -344,12 +364,27 @@ public class VersionsResource extends ApplicationResource {
|
|||
throw new IllegalStateException("No Version Control Request with ID " + requestId + " is currently active");
|
||||
}
|
||||
|
||||
activeRequest = null;
|
||||
return withWriteLock(
|
||||
serviceFacade,
|
||||
null,
|
||||
lookup -> {
|
||||
final NiFiUser user = NiFiUserUtils.getNiFiUser();
|
||||
if (user == null) {
|
||||
throw new AccessDeniedException("Unknown user.");
|
||||
}
|
||||
|
||||
if (!user.equals(activeRequest.getUser())) {
|
||||
throw new AccessDeniedException("Only the user that creates the Version Control Request can use it.");
|
||||
}
|
||||
},
|
||||
null,
|
||||
requestEntity -> {
|
||||
// clear the active request
|
||||
activeRequest = null;
|
||||
|
||||
return generateOkResponse().build();
|
||||
});
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
@ -407,7 +442,7 @@ public class VersionsResource extends ApplicationResource {
|
|||
|
||||
if (isReplicateRequest()) {
|
||||
// We first have to obtain a "lock" on all nodes in the cluster so that multiple Version Control requests
|
||||
// are not being made simultaneously. We do this by making a POST to /nifi-api/versions/start-requests.
|
||||
// are not being made simultaneously. We do this by making a POST to /nifi-api/versions/active-requests.
|
||||
// The Response gives us back the Request ID.
|
||||
final URI requestUri;
|
||||
try {
|
||||
|
@ -415,7 +450,7 @@ public class VersionsResource extends ApplicationResource {
|
|||
final String requestId = lockVersionControl(originalUri, groupId);
|
||||
|
||||
requestUri = new URI(originalUri.getScheme(), originalUri.getUserInfo(), originalUri.getHost(),
|
||||
originalUri.getPort(), "/nifi-api/versions/start-requests/" + requestId, null, originalUri.getFragment());
|
||||
originalUri.getPort(), "/nifi-api/versions/active-requests/" + requestId, null, originalUri.getFragment());
|
||||
} catch (final URISyntaxException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
|
@ -448,9 +483,12 @@ public class VersionsResource extends ApplicationResource {
|
|||
lookup -> {
|
||||
final ProcessGroupAuthorizable groupAuthorizable = lookup.getProcessGroup(groupId);
|
||||
final Authorizable processGroup = groupAuthorizable.getAuthorizable();
|
||||
processGroup.authorize(authorizer, RequestAction.READ, NiFiUserUtils.getNiFiUser());
|
||||
|
||||
// require write to this group
|
||||
processGroup.authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser());
|
||||
super.authorizeProcessGroup(groupAuthorizable, authorizer, lookup, RequestAction.READ, true, false, true, true);
|
||||
|
||||
// require read to this group and all descendants
|
||||
authorizeProcessGroup(groupAuthorizable, authorizer, lookup, RequestAction.READ, true, false, true, true);
|
||||
},
|
||||
() -> {
|
||||
final VersionedFlowDTO versionedFlow = requestEntity.getVersionedFlow();
|
||||
|
@ -493,15 +531,19 @@ public class VersionsResource extends ApplicationResource {
|
|||
|
||||
private String lockVersionControl(final URI originalUri, final String groupId) throws URISyntaxException {
|
||||
final URI createRequestUri = new URI(originalUri.getScheme(), originalUri.getUserInfo(), originalUri.getHost(),
|
||||
originalUri.getPort(), "/nifi-api/versions/start-requests", null, originalUri.getFragment());
|
||||
originalUri.getPort(), "/nifi-api/versions/active-requests", null, originalUri.getFragment());
|
||||
|
||||
final NodeResponse clusterResponse;
|
||||
try {
|
||||
// create an active request entity to indicate the group id
|
||||
final CreateActiveRequestEntity activeRequestEntity = new CreateActiveRequestEntity();
|
||||
activeRequestEntity.setProcessGroupId(groupId);
|
||||
|
||||
if (getReplicationTarget() == ReplicationTarget.CLUSTER_NODES) {
|
||||
clusterResponse = getRequestReplicator().replicate(HttpMethod.POST, createRequestUri, new MultivaluedHashMap<>(), Collections.emptyMap()).awaitMergedResponse();
|
||||
clusterResponse = getRequestReplicator().replicate(HttpMethod.POST, createRequestUri, activeRequestEntity, Collections.emptyMap()).awaitMergedResponse();
|
||||
} else {
|
||||
clusterResponse = getRequestReplicator().forwardToCoordinator(
|
||||
getClusterCoordinatorNode(), HttpMethod.POST, createRequestUri, new MultivaluedHashMap<>(), Collections.emptyMap()).awaitMergedResponse();
|
||||
getClusterCoordinatorNode(), HttpMethod.POST, createRequestUri, activeRequestEntity, Collections.emptyMap()).awaitMergedResponse();
|
||||
}
|
||||
} catch (final InterruptedException ie) {
|
||||
Thread.currentThread().interrupt();
|
||||
|
@ -625,7 +667,7 @@ public class VersionsResource extends ApplicationResource {
|
|||
},
|
||||
(revision, groupEntity) -> {
|
||||
// disconnect from version control
|
||||
final VersionControlInformationEntity entity = serviceFacade.deleteVersionControl(requestRevision, groupId);
|
||||
final VersionControlInformationEntity entity = serviceFacade.deleteVersionControl(revision, groupId);
|
||||
|
||||
// generate the response
|
||||
return generateOkResponse(entity).build();
|
||||
|
@ -716,8 +758,8 @@ public class VersionsResource extends ApplicationResource {
|
|||
versionControlInfoDto.setGroupId(groupId);
|
||||
versionControlInfoDto.setModified(false);
|
||||
versionControlInfoDto.setVersion(snapshotMetadata.getVersion());
|
||||
versionControlInfoDto.setRegistryId(requestEntity.getRegistryId());
|
||||
versionControlInfoDto.setRegistryName(serviceFacade.getFlowRegistryName(requestEntity.getRegistryId()));
|
||||
versionControlInfoDto.setRegistryId(entity.getRegistryId());
|
||||
versionControlInfoDto.setRegistryName(serviceFacade.getFlowRegistryName(entity.getRegistryId()));
|
||||
|
||||
final ProcessGroupEntity updatedGroup = serviceFacade.updateProcessGroup(rev, groupId, versionControlInfoDto, flowSnapshot, getIdGenerationSeed().orElse(null), false,
|
||||
entity.getUpdateDescendantVersionedFlows());
|
||||
|
@ -785,6 +827,7 @@ public class VersionsResource extends ApplicationResource {
|
|||
|
||||
final NiFiUser user = NiFiUserUtils.getNiFiUser();
|
||||
|
||||
// request manager will ensure that the current is the user that submitted this request
|
||||
final AsynchronousWebRequest<VersionControlInformationEntity> asyncRequest = requestManager.getRequest(requestType, requestId, user);
|
||||
|
||||
final VersionedFlowUpdateRequestDTO updateRequestDto = new VersionedFlowUpdateRequestDTO();
|
||||
|
@ -865,6 +908,7 @@ public class VersionsResource extends ApplicationResource {
|
|||
|
||||
final NiFiUser user = NiFiUserUtils.getNiFiUser();
|
||||
|
||||
// request manager will ensure that the current is the user that submitted this request
|
||||
final AsynchronousWebRequest<VersionControlInformationEntity> asyncRequest = requestManager.removeRequest(requestType, requestId, user);
|
||||
if (asyncRequest == null) {
|
||||
throw new ResourceNotFoundException("Could not find request of type " + requestType + " with ID " + requestId);
|
||||
|
@ -1021,11 +1065,8 @@ public class VersionsResource extends ApplicationResource {
|
|||
lookup -> {
|
||||
// Step 2: Verify READ and WRITE permissions for user, for every component.
|
||||
final ProcessGroupAuthorizable groupAuthorizable = lookup.getProcessGroup(groupId);
|
||||
final Authorizable processGroup = groupAuthorizable.getAuthorizable();
|
||||
processGroup.authorize(authorizer, RequestAction.READ, NiFiUserUtils.getNiFiUser());
|
||||
processGroup.authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser());
|
||||
super.authorizeProcessGroup(groupAuthorizable, authorizer, lookup, RequestAction.READ, true, false, true, true);
|
||||
super.authorizeProcessGroup(groupAuthorizable, authorizer, lookup, RequestAction.WRITE, true, false, true, true);
|
||||
authorizeProcessGroup(groupAuthorizable, authorizer, lookup, RequestAction.READ, true, false, true, true);
|
||||
authorizeProcessGroup(groupAuthorizable, authorizer, lookup, RequestAction.WRITE, true, false, true, true);
|
||||
|
||||
final VersionedProcessGroup groupContents = flowSnapshot.getFlowContents();
|
||||
final boolean containsRestrictedComponents = FlowRegistryUtils.containsRestrictedComponent(groupContents);
|
||||
|
@ -1049,7 +1090,7 @@ public class VersionsResource extends ApplicationResource {
|
|||
final Consumer<AsynchronousWebRequest<VersionControlInformationEntity>> updateTask = vcur -> {
|
||||
try {
|
||||
final VersionControlInformationEntity updatedVersionControlEntity = updateFlowVersion(groupId, componentLifecycle, exampleUri,
|
||||
affectedComponents, user, replicateRequest, requestEntity, flowSnapshot, request, idGenerationSeed, true, true);
|
||||
affectedComponents, user, replicateRequest, processGroupEntity, flowSnapshot, request, idGenerationSeed, true, true);
|
||||
|
||||
vcur.markComplete(updatedVersionControlEntity);
|
||||
} catch (final LifecycleManagementException e) {
|
||||
|
@ -1170,11 +1211,8 @@ public class VersionsResource extends ApplicationResource {
|
|||
lookup -> {
|
||||
// Step 2: Verify READ and WRITE permissions for user, for every component.
|
||||
final ProcessGroupAuthorizable groupAuthorizable = lookup.getProcessGroup(groupId);
|
||||
final Authorizable processGroup = groupAuthorizable.getAuthorizable();
|
||||
processGroup.authorize(authorizer, RequestAction.READ, NiFiUserUtils.getNiFiUser());
|
||||
processGroup.authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser());
|
||||
super.authorizeProcessGroup(groupAuthorizable, authorizer, lookup, RequestAction.READ, true, false, true, true);
|
||||
super.authorizeProcessGroup(groupAuthorizable, authorizer, lookup, RequestAction.WRITE, true, false, true, true);
|
||||
authorizeProcessGroup(groupAuthorizable, authorizer, lookup, RequestAction.READ, true, false, true, true);
|
||||
authorizeProcessGroup(groupAuthorizable, authorizer, lookup, RequestAction.WRITE, true, false, true, true);
|
||||
|
||||
final VersionedProcessGroup groupContents = flowSnapshot.getFlowContents();
|
||||
final boolean containsRestrictedComponents = FlowRegistryUtils.containsRestrictedComponent(groupContents);
|
||||
|
@ -1217,7 +1255,7 @@ public class VersionsResource extends ApplicationResource {
|
|||
final Consumer<AsynchronousWebRequest<VersionControlInformationEntity>> updateTask = vcur -> {
|
||||
try {
|
||||
final VersionControlInformationEntity updatedVersionControlEntity = updateFlowVersion(groupId, componentLifecycle, exampleUri,
|
||||
affectedComponents, user, replicateRequest, requestEntity, flowSnapshot, request, idGenerationSeed, false, true);
|
||||
affectedComponents, user, replicateRequest, processGroupEntity, flowSnapshot, request, idGenerationSeed, false, true);
|
||||
|
||||
vcur.markComplete(updatedVersionControlEntity);
|
||||
} catch (final LifecycleManagementException e) {
|
||||
|
@ -1471,11 +1509,17 @@ public class VersionsResource extends ApplicationResource {
|
|||
private static final long MAX_REQUEST_LOCK_NANOS = TimeUnit.MINUTES.toNanos(1L);
|
||||
|
||||
private final String requestId;
|
||||
private final NiFiUser user;
|
||||
private final String processGroupId;
|
||||
private final long creationNanos = System.nanoTime();
|
||||
private final long expirationTime = creationNanos + MAX_REQUEST_LOCK_NANOS;
|
||||
|
||||
private ActiveRequest(final String requestId) {
|
||||
private boolean updatePerformed = false;
|
||||
|
||||
private ActiveRequest(final String requestId, final NiFiUser user, final String processGroupId) {
|
||||
this.requestId = requestId;
|
||||
this.user = user;
|
||||
this.processGroupId = processGroupId;
|
||||
}
|
||||
|
||||
public boolean isExpired() {
|
||||
|
@ -1485,5 +1529,21 @@ public class VersionsResource extends ApplicationResource {
|
|||
public String getRequestId() {
|
||||
return requestId;
|
||||
}
|
||||
|
||||
public NiFiUser getUser() {
|
||||
return user;
|
||||
}
|
||||
|
||||
public String getProcessGroupId() {
|
||||
return processGroupId;
|
||||
}
|
||||
|
||||
public void updatePerformed() {
|
||||
updatePerformed = true;
|
||||
}
|
||||
|
||||
public boolean isUpdatePerformed() {
|
||||
return updatePerformed;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -21,8 +21,7 @@
|
|||
highlight-crumb-id="appCtrl.nf.CanvasUtils.getGroupId();"
|
||||
separator-func="appCtrl.nf.Common.isDefinedAndNotNull"
|
||||
is-tracking="appCtrl.serviceProvider.breadcrumbsCtrl.isTracking"
|
||||
is-current="appCtrl.serviceProvider.breadcrumbsCtrl.isCurrent"
|
||||
is-modified="appCtrl.serviceProvider.breadcrumbsCtrl.isModified"
|
||||
get-version-control-class="appCtrl.serviceProvider.breadcrumbsCtrl.getVersionControlClass"
|
||||
get-version-control-tooltip="appCtrl.serviceProvider.breadcrumbsCtrl.getVersionControlTooltip">
|
||||
</nf-breadcrumbs>
|
||||
<div id="graph-controls">
|
||||
|
|
|
@ -161,7 +161,7 @@ div.context-menu-provenance {
|
|||
text-shadow: 0 0 4px rgba(255,255,255,1);
|
||||
}
|
||||
|
||||
.locally-modified {
|
||||
.locally-modified, .sync-failure {
|
||||
float: left;
|
||||
color: #747474 !important;
|
||||
fill: #747474 !important;
|
||||
|
@ -169,15 +169,7 @@ div.context-menu-provenance {
|
|||
text-shadow: 0 0 4px rgba(255,255,255,1);
|
||||
}
|
||||
|
||||
.stale {
|
||||
float: left;
|
||||
color: #c7685d !important;
|
||||
fill: #c7685d !important;
|
||||
margin-top: 0px !important;
|
||||
text-shadow: 0 0 4px rgba(255,255,255,1);
|
||||
}
|
||||
|
||||
.locally-modified-and-stale {
|
||||
.stale, .locally-modified-and-stale {
|
||||
float: left;
|
||||
color: #c7685d !important;
|
||||
fill: #c7685d !important;
|
||||
|
|
|
@ -107,31 +107,28 @@
|
|||
},
|
||||
|
||||
/**
|
||||
* Returns whether the specified version control information is current.
|
||||
* Returns the class string to use for the version control of the specified breadcrumb.
|
||||
*
|
||||
* @param breadcrumbEntity
|
||||
* @returns {boolean}
|
||||
* @returns {string}
|
||||
*/
|
||||
isCurrent: function (breadcrumbEntity) {
|
||||
getVersionControlClass: function (breadcrumbEntity) {
|
||||
if (nfCommon.isDefinedAndNotNull(breadcrumbEntity.breadcrumb.versionControlInformation)) {
|
||||
return breadcrumbEntity.breadcrumb.versionControlInformation.current === true;
|
||||
var vciState = breadcrumbEntity.breadcrumb.versionControlInformation.state;
|
||||
if (vciState === 'SYNC_FAILURE') {
|
||||
return 'breadcrumb-version-control-gray fa fa-question'
|
||||
} else if (vciState === 'LOCALLY_MODIFIED_AND_STALE') {
|
||||
return 'breadcrumb-version-control-red fa fa-exclamation-circle';
|
||||
} else if (vciState === 'STALE') {
|
||||
return 'breadcrumb-version-control-red fa fa-arrow-circle-up';
|
||||
} else if (vciState === 'LOCALLY_MODIFIED') {
|
||||
return 'breadcrumb-version-control-gray fa fa-asterisk';
|
||||
} else {
|
||||
return 'breadcrumb-version-control-green fa fa-check';
|
||||
}
|
||||
|
||||
return false;
|
||||
},
|
||||
|
||||
/**
|
||||
* Returns whether the specified version control information is current.
|
||||
*
|
||||
* @param versionControlInformation
|
||||
* @returns {boolean}
|
||||
*/
|
||||
isModified: function (breadcrumbEntity) {
|
||||
if (nfCommon.isDefinedAndNotNull(breadcrumbEntity.breadcrumb.versionControlInformation)) {
|
||||
return breadcrumbEntity.breadcrumb.versionControlInformation.modified === true;
|
||||
} else {
|
||||
return '';
|
||||
}
|
||||
|
||||
return false;
|
||||
},
|
||||
|
||||
/**
|
||||
|
|
|
@ -42,8 +42,7 @@
|
|||
'highlightCrumbId': '=',
|
||||
'separatorFunc': '=',
|
||||
'isTracking': '=',
|
||||
'isCurrent': '=',
|
||||
'isModified': '=',
|
||||
'getVersionControlClass': '=',
|
||||
'getVersionControlTooltip': '='
|
||||
},
|
||||
link: function (scope, element, attrs) {
|
||||
|
|
|
@ -463,7 +463,7 @@
|
|||
}
|
||||
|
||||
// check the selection for version control information
|
||||
return versionControlInformation.current === true && versionControlInformation.modified === true;
|
||||
return versionControlInformation.state === 'LOCALLY_MODIFIED';
|
||||
};
|
||||
|
||||
/**
|
||||
|
@ -502,7 +502,7 @@
|
|||
}
|
||||
|
||||
// check the selection for version control information
|
||||
return versionControlInformation.modified === true;
|
||||
return versionControlInformation.state === 'LOCALLY_MODIFIED' || versionControlInformation.state === 'LOCALLY_MODIFIED_AND_STALE';
|
||||
};
|
||||
|
||||
/**
|
||||
|
@ -541,7 +541,7 @@
|
|||
}
|
||||
|
||||
// check the selection for version control information
|
||||
return versionControlInformation.modified === false;
|
||||
return versionControlInformation.state !== 'LOCALLY_MODIFIED' && versionControlInformation.state !== 'LOCALLY_MODIFIED_AND_STALE';
|
||||
};
|
||||
|
||||
/**
|
||||
|
|
|
@ -227,10 +227,11 @@
|
|||
*
|
||||
* @param registryIdentifier
|
||||
* @param bucketCombo
|
||||
* @param flowCombo
|
||||
* @param selectBucket
|
||||
* @returns {*}
|
||||
*/
|
||||
var loadBuckets = function (registryIdentifier, bucketCombo, selectBucket) {
|
||||
var loadBuckets = function (registryIdentifier, bucketCombo, flowCombo, selectBucket) {
|
||||
return $.ajax({
|
||||
type: 'GET',
|
||||
url: '../nifi-api/flow/registries/' + encodeURIComponent(registryIdentifier) + '/buckets',
|
||||
|
@ -258,6 +259,14 @@
|
|||
optionClass: 'unset',
|
||||
disabled: true
|
||||
});
|
||||
flowCombo.combo('destroy').combo({
|
||||
options: [{
|
||||
text: 'No available flows',
|
||||
value: null,
|
||||
optionClass: 'unset',
|
||||
disabled: true
|
||||
}]
|
||||
});
|
||||
}
|
||||
|
||||
// load the buckets
|
||||
|
@ -287,6 +296,14 @@
|
|||
disabled: true
|
||||
}]
|
||||
});
|
||||
flowCombo.combo('destroy').combo({
|
||||
options: [{
|
||||
text: 'No available flows',
|
||||
value: null,
|
||||
optionClass: 'unset',
|
||||
disabled: true
|
||||
}]
|
||||
});
|
||||
|
||||
dialog.modal('refreshButtons');
|
||||
};
|
||||
|
@ -316,7 +333,7 @@
|
|||
clearFlowVersionsGrid();
|
||||
}
|
||||
|
||||
loadBuckets(selectedOption.value, bucketCombo, selectBucket).fail(function () {
|
||||
loadBuckets(selectedOption.value, bucketCombo, flowCombo, selectBucket).fail(function () {
|
||||
showNoBucketsAvailable();
|
||||
});
|
||||
}
|
||||
|
@ -1439,7 +1456,7 @@
|
|||
|
||||
nfDialog.showOkDialog({
|
||||
headerText: 'Revert Local Changes',
|
||||
dialogContent: nfCommon.escapeHtml(changeRequest.failureReason)
|
||||
dialogContent: nfCommon.escapeHtml(revertRequest.failureReason)
|
||||
});
|
||||
} else {
|
||||
// update the percent complete
|
||||
|
|
|
@ -397,7 +397,7 @@
|
|||
'class': 'process-group-disabled-count process-group-contents-count'
|
||||
});
|
||||
|
||||
// current icon
|
||||
// up to date icon
|
||||
details.append('text')
|
||||
.attr({
|
||||
'x': 10,
|
||||
|
@ -409,7 +409,7 @@
|
|||
})
|
||||
.text('\uf00c');
|
||||
|
||||
// current count
|
||||
// up to date count
|
||||
details.append('text')
|
||||
.attr({
|
||||
'y': function () {
|
||||
|
@ -418,7 +418,7 @@
|
|||
'class': 'process-group-up-to-date-count process-group-contents-count'
|
||||
});
|
||||
|
||||
// modified icon
|
||||
// locally modified icon
|
||||
details.append('text')
|
||||
.attr({
|
||||
'y': function () {
|
||||
|
@ -429,7 +429,7 @@
|
|||
})
|
||||
.text('\uf069');
|
||||
|
||||
// modified count
|
||||
// locally modified count
|
||||
details.append('text')
|
||||
.attr({
|
||||
'y': function () {
|
||||
|
@ -438,7 +438,7 @@
|
|||
'class': 'process-group-locally-modified-count process-group-contents-count'
|
||||
});
|
||||
|
||||
// not current icon
|
||||
// stale icon
|
||||
details.append('text')
|
||||
.attr({
|
||||
'y': function () {
|
||||
|
@ -449,7 +449,7 @@
|
|||
})
|
||||
.text('\uf0aa');
|
||||
|
||||
// not current count
|
||||
// stale count
|
||||
details.append('text')
|
||||
.attr({
|
||||
'y': function () {
|
||||
|
@ -458,7 +458,7 @@
|
|||
'class': 'process-group-stale-count process-group-contents-count'
|
||||
});
|
||||
|
||||
// modified and not current icon
|
||||
// locally modified and stale icon
|
||||
details.append('text')
|
||||
.attr({
|
||||
'y': function () {
|
||||
|
@ -469,7 +469,7 @@
|
|||
})
|
||||
.text('\uf06a');
|
||||
|
||||
// modified and not current count
|
||||
// locally modified and stale count
|
||||
details.append('text')
|
||||
.attr({
|
||||
'y': function () {
|
||||
|
@ -478,6 +478,26 @@
|
|||
'class': 'process-group-locally-modified-and-stale-count process-group-contents-count'
|
||||
});
|
||||
|
||||
// sync failure icon
|
||||
details.append('text')
|
||||
.attr({
|
||||
'y': function () {
|
||||
return processGroupData.dimensions.height - 7;
|
||||
},
|
||||
'class': 'process-group-sync-failure process-group-contents-icon',
|
||||
'font-family': 'FontAwesome'
|
||||
})
|
||||
.text('\uf128');
|
||||
|
||||
// sync failure count
|
||||
details.append('text')
|
||||
.attr({
|
||||
'y': function () {
|
||||
return processGroupData.dimensions.height - 7;
|
||||
},
|
||||
'class': 'process-group-sync-failure-count process-group-contents-count'
|
||||
});
|
||||
|
||||
// ----------------
|
||||
// stats background
|
||||
// ----------------
|
||||
|
@ -940,13 +960,14 @@
|
|||
'visibility': isUnderVersionControl(processGroupData) ? 'visible' : 'hidden',
|
||||
'fill': function () {
|
||||
if (isUnderVersionControl(processGroupData)) {
|
||||
var modified = processGroupData.component.versionControlInformation.modified;
|
||||
var current = processGroupData.component.versionControlInformation.current;
|
||||
if (modified === true && current === false) {
|
||||
var vciState = processGroupData.component.versionControlInformation.state;
|
||||
if (vciState === 'SYNC_FAILURE') {
|
||||
return '#666666';
|
||||
} else if (vciState === 'LOCALLY_MODIFIED_AND_STALE') {
|
||||
return '#BA554A';
|
||||
} else if (current === false) {
|
||||
} else if (vciState === 'STALE') {
|
||||
return '#BA554A';
|
||||
} else if (modified === true) {
|
||||
} else if (vciState === 'LOCALLY_MODIFIED') {
|
||||
return '#666666';
|
||||
} else {
|
||||
return '#1A9964';
|
||||
|
@ -958,13 +979,14 @@
|
|||
})
|
||||
.text(function () {
|
||||
if (isUnderVersionControl(processGroupData)) {
|
||||
var modified = processGroupData.component.versionControlInformation.modified;
|
||||
var current = processGroupData.component.versionControlInformation.current;
|
||||
if (modified === true && current === false) {
|
||||
var vciState = processGroupData.component.versionControlInformation.state;
|
||||
if (vciState === 'SYNC_FAILURE') {
|
||||
return '\uf128'
|
||||
} else if (vciState === 'LOCALLY_MODIFIED_AND_STALE') {
|
||||
return '\uf06a';
|
||||
} else if (current === false) {
|
||||
} else if (vciState === 'STALE') {
|
||||
return '\uf0aa';
|
||||
} else if (modified === true) {
|
||||
} else if (vciState === 'LOCALLY_MODIFIED') {
|
||||
return '\uf069';
|
||||
} else {
|
||||
return '\uf00c';
|
||||
|
@ -1081,8 +1103,8 @@
|
|||
});
|
||||
var upToDateCount = details.select('text.process-group-up-to-date-count')
|
||||
.attr('x', function () {
|
||||
var currentCountX = parseInt(upToDate.attr('x'), 10);
|
||||
return currentCountX + Math.round(upToDate.node().getComputedTextLength()) + CONTENTS_VALUE_SPACER;
|
||||
var updateToDateCountX = parseInt(upToDate.attr('x'), 10);
|
||||
return updateToDateCountX + Math.round(upToDate.node().getComputedTextLength()) + CONTENTS_VALUE_SPACER;
|
||||
})
|
||||
.text(function (d) {
|
||||
return d.component.upToDateCount;
|
||||
|
@ -1097,13 +1119,13 @@
|
|||
return d.component.locallyModifiedCount === 0;
|
||||
})
|
||||
.attr('x', function () {
|
||||
var currentX = parseInt(upToDateCount.attr('x'), 10);
|
||||
return currentX + Math.round(upToDateCount.node().getComputedTextLength()) + CONTENTS_SPACER;
|
||||
var upToDateX = parseInt(upToDateCount.attr('x'), 10);
|
||||
return upToDateX + Math.round(upToDateCount.node().getComputedTextLength()) + CONTENTS_SPACER;
|
||||
});
|
||||
var locallyModifiedCount = details.select('text.process-group-locally-modified-count')
|
||||
.attr('x', function () {
|
||||
var modifiedCountX = parseInt(locallyModified.attr('x'), 10);
|
||||
return modifiedCountX + Math.round(locallyModified.node().getComputedTextLength()) + CONTENTS_VALUE_SPACER;
|
||||
var locallyModifiedCountX = parseInt(locallyModified.attr('x'), 10);
|
||||
return locallyModifiedCountX + Math.round(locallyModified.node().getComputedTextLength()) + CONTENTS_VALUE_SPACER;
|
||||
})
|
||||
.text(function (d) {
|
||||
return d.component.locallyModifiedCount;
|
||||
|
@ -1118,13 +1140,13 @@
|
|||
return d.component.staleCount === 0;
|
||||
})
|
||||
.attr('x', function () {
|
||||
var modifiedX = parseInt(locallyModifiedCount.attr('x'), 10);
|
||||
return modifiedX + Math.round(locallyModifiedCount.node().getComputedTextLength()) + CONTENTS_SPACER;
|
||||
var locallyModifiedX = parseInt(locallyModifiedCount.attr('x'), 10);
|
||||
return locallyModifiedX + Math.round(locallyModifiedCount.node().getComputedTextLength()) + CONTENTS_SPACER;
|
||||
});
|
||||
var staleCount = details.select('text.process-group-stale-count')
|
||||
.attr('x', function () {
|
||||
var notCurrentCountX = parseInt(stale.attr('x'), 10);
|
||||
return notCurrentCountX + Math.round(stale.node().getComputedTextLength()) + CONTENTS_VALUE_SPACER;
|
||||
var staleCountX = parseInt(stale.attr('x'), 10);
|
||||
return staleCountX + Math.round(stale.node().getComputedTextLength()) + CONTENTS_VALUE_SPACER;
|
||||
})
|
||||
.text(function (d) {
|
||||
return d.component.staleCount;
|
||||
|
@ -1139,17 +1161,38 @@
|
|||
return d.component.locallyModifiedAndStaleCount === 0;
|
||||
})
|
||||
.attr('x', function () {
|
||||
var runningX = parseInt(staleCount.attr('x'), 10);
|
||||
return runningX + Math.round(staleCount.node().getComputedTextLength()) + CONTENTS_SPACER;
|
||||
var staleX = parseInt(staleCount.attr('x'), 10);
|
||||
return staleX + Math.round(staleCount.node().getComputedTextLength()) + CONTENTS_SPACER;
|
||||
});
|
||||
details.select('text.process-group-locally-modified-and-stale-count')
|
||||
var locallyModifiedAndStaleCount = details.select('text.process-group-locally-modified-and-stale-count')
|
||||
.attr('x', function () {
|
||||
var modifiedAndNotCurrentCountX = parseInt(locallyModifiedAndStale.attr('x'), 10);
|
||||
return modifiedAndNotCurrentCountX + Math.round(locallyModifiedAndStale.node().getComputedTextLength()) + CONTENTS_VALUE_SPACER;
|
||||
var locallyModifiedAndStaleCountX = parseInt(locallyModifiedAndStale.attr('x'), 10);
|
||||
return locallyModifiedAndStaleCountX + Math.round(locallyModifiedAndStale.node().getComputedTextLength()) + CONTENTS_VALUE_SPACER;
|
||||
})
|
||||
.text(function (d) {
|
||||
return d.component.locallyModifiedAndStaleCount;
|
||||
});
|
||||
|
||||
// update sync failure
|
||||
var syncFailure = details.select('text.process-group-sync-failure')
|
||||
.classed('sync-failure', function (d) {
|
||||
return d.component.syncFailureCount > 0;
|
||||
})
|
||||
.classed('zero', function (d) {
|
||||
return d.component.syncFailureCount === 0;
|
||||
})
|
||||
.attr('x', function () {
|
||||
var syncFailureX = parseInt(locallyModifiedAndStaleCount.attr('x'), 10);
|
||||
return syncFailureX + Math.round(locallyModifiedAndStaleCount.node().getComputedTextLength()) + CONTENTS_SPACER - 2;
|
||||
});
|
||||
details.select('text.process-group-sync-failure-count')
|
||||
.attr('x', function () {
|
||||
var syncFailureCountX = parseInt(syncFailure.attr('x'), 10);
|
||||
return syncFailureCountX + Math.round(syncFailure.node().getComputedTextLength()) + CONTENTS_VALUE_SPACER;
|
||||
})
|
||||
.text(function (d) {
|
||||
return d.component.syncFailureCount;
|
||||
});
|
||||
} else {
|
||||
// update version control information
|
||||
processGroup.select('text.version-control').style('visibility', false).text('');
|
||||
|
|
|
@ -425,17 +425,7 @@
|
|||
* @param versionControlInformation
|
||||
*/
|
||||
getVersionControlTooltip: function (versionControlInformation) {
|
||||
var modified = versionControlInformation.modified;
|
||||
var current = versionControlInformation.current;
|
||||
if (modified === true && current === false) {
|
||||
return 'Local changes have been made and a newer version of this flow is available';
|
||||
} else if (current === false) {
|
||||
return 'A newer version of this flow is available';
|
||||
} else if (modified === true) {
|
||||
return 'Local changes have been made';
|
||||
} else {
|
||||
return 'Flow version is current';
|
||||
}
|
||||
return versionControlInformation.stateExplanation;
|
||||
},
|
||||
|
||||
/**
|
||||
|
|
|
@ -22,14 +22,7 @@ limitations under the License.
|
|||
<span ng-if="separatorFunc(crumb.parentBreadcrumb)" style="margin: 0 12px;">
|
||||
»
|
||||
</span>
|
||||
<span ng-if="isTracking(crumb) && isCurrent(crumb) && !isModified(crumb)" title="{{getVersionControlTooltip(crumb)}}"
|
||||
class="breadcrumb-version-control-green fa fa-check" style="margin: 0 6px;"></span>
|
||||
<span ng-if="isTracking(crumb) && isCurrent(crumb) && isModified(crumb)" title="{{getVersionControlTooltip(crumb)}}"
|
||||
class="breadcrumb-version-control-gray fa fa-asterisk" style="margin: 0 6px;"></span>
|
||||
<span ng-if="isTracking(crumb) && !isCurrent(crumb) && !isModified(crumb)" title="{{getVersionControlTooltip(crumb)}}"
|
||||
class="breadcrumb-version-control-red fa fa-arrow-circle-up" style="margin: 0 6px;"></span>
|
||||
<span ng-if="isTracking(crumb) && !isCurrent(crumb) && isModified(crumb)" title="{{getVersionControlTooltip(crumb)}}"
|
||||
class="breadcrumb-version-control-red fa fa-exclamation-circle" style="margin: 0 6px;"></span>
|
||||
<span ng-if="isTracking(crumb)" title="{{getVersionControlTooltip(crumb)}}" class="{{getVersionControlClass(crumb)}}" style="margin: 0 6px;"></span>
|
||||
<span class="link"
|
||||
ng-class="(highlightCrumbId === crumb.id) ? 'link-bold' : ''"
|
||||
ng-click="clickFunc(crumb.id)">
|
||||
|
|
Loading…
Reference in New Issue