NIFI-2635: - Re-using the original request during the second phase of the two phase commit. - Forwarding requests to the coordinator when received by a node.

This closes #933

Signed-off-by: jpercivall <joepercivall@yahoo.com>
This commit is contained in:
Matt Gilman 2016-08-24 15:39:04 -04:00 committed by jpercivall
parent e9da90812b
commit c2bfc4ef24
27 changed files with 1760 additions and 1303 deletions

View File

@ -34,6 +34,10 @@
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
<dependency>
<groupId>com.sun.jersey</groupId>
<artifactId>jersey-core</artifactId>
</dependency>
<dependency>
<groupId>com.sun.jersey</groupId>
<artifactId>jersey-client</artifactId>

View File

@ -38,10 +38,14 @@ public interface RequestReplicator {
public static final int NODE_CONTINUE_STATUS_CODE = 150;
/**
* Indicates that the request is intended to cancel a lock that was previously obtained without performing the action
* Indicates that the request is intended to cancel a transaction that was previously created without performing the action
*/
public static final String LOCK_CANCELATION_HEADER = "X-Cancel-Lock";
public static final String LOCK_VERSION_ID_HEADER = "X-Lock-Version-Id";
public static final String REQUEST_TRANSACTION_CANCELATION_HTTP_HEADER = "X-Cancel-Transaction";
/**
* Indicates that this is the second phase of the two phase commit and the execution of the action should proceed.
*/
public static final String REQUEST_EXECUTION_HTTP_HEADER = "X-Execution-Continue";
/**
* When we replicate a request across the cluster, we replicate it only from the cluster coordinator.
@ -106,6 +110,20 @@ public interface RequestReplicator {
*/
AsyncClusterResponse replicate(Set<NodeIdentifier> nodeIds, String method, URI uri, Object entity, Map<String, String> headers, boolean indicateReplicated, boolean performVerification);
/**
* Forwards a request to the Cluster Coordinator so that it is able to replicate the request to all nodes in the cluster.
*
* @param coordinatorNodeId the node identifier of the Cluster Coordinator
* @param method the HTTP method (e.g., POST, PUT)
* @param uri the base request URI (up to, but not including, the query string)
* @param entity an entity
* @param headers any HTTP headers
*
* @return an AsyncClusterResponse that indicates the current status of the request and provides an identifier for obtaining an updated response later
*/
AsyncClusterResponse forwardToCoordinator(NodeIdentifier coordinatorNodeId, String method, URI uri, Object entity, Map<String, String> headers);
/**
* <p>
* Returns an AsyncClusterResponse that provides the most up-to-date status of the request with the given identifier.

View File

@ -43,6 +43,7 @@ import org.apache.nifi.events.EventReporter;
import org.apache.nifi.reporting.Severity;
import org.apache.nifi.util.ComponentIdGenerator;
import org.apache.nifi.util.FormatUtils;
import org.apache.nifi.util.NiFiProperties;
import org.apache.nifi.web.security.ProxiedEntitiesUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -75,7 +76,6 @@ import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.nifi.util.NiFiProperties;
public class ThreadPoolRequestReplicator implements RequestReplicator {
@ -249,15 +249,29 @@ public class ThreadPoolRequestReplicator implements RequestReplicator {
lock.lock();
try {
logger.debug("Lock {} obtained in order to replicate request {} {}", method, uri);
return replicate(nodeIds, method, uri, entity, updatedHeaders, performVerification, null);
return replicate(nodeIds, method, uri, entity, updatedHeaders, performVerification, null, !performVerification);
} finally {
lock.unlock();
}
} else {
return replicate(nodeIds, method, uri, entity, updatedHeaders, performVerification, null);
return replicate(nodeIds, method, uri, entity, updatedHeaders, performVerification, null, !performVerification);
}
}
@Override
public AsyncClusterResponse forwardToCoordinator(final NodeIdentifier coordinatorNodeId, final String method, final URI uri, final Object entity, final Map<String, String> headers) {
// If the user is authenticated, add them as a proxied entity so that when the receiving NiFi receives the request,
// it knows that we are acting as a proxy on behalf of the current user.
final Map<String, String> updatedHeaders = new HashMap<>(headers);
final NiFiUser user = NiFiUserUtils.getNiFiUser();
if (user != null && !user.isAnonymous()) {
final String proxiedEntitiesChain = ProxiedEntitiesUtils.buildProxiedEntitiesChainString(user);
updatedHeaders.put(ProxiedEntitiesUtils.PROXY_ENTITIES_CHAIN, proxiedEntitiesChain);
}
return replicate(Collections.singleton(coordinatorNodeId), method, uri, entity, updatedHeaders, false, null, false);
}
/**
* Replicates the request to all nodes in the given set of node identifiers
*
@ -268,11 +282,12 @@ public class ThreadPoolRequestReplicator implements RequestReplicator {
* @param headers the HTTP Headers
* @param performVerification whether or not to verify that all nodes in the cluster are connected and that all nodes can perform request. Ignored if request is not mutable.
* @param response the response to update with the results
* @param executionPhase <code>true</code> if this is the execution phase, <code>false</code> otherwise
*
* @return an AsyncClusterResponse that can be used to obtain the result
*/
private AsyncClusterResponse replicate(Set<NodeIdentifier> nodeIds, String method, URI uri, Object entity, Map<String, String> headers, boolean performVerification,
StandardAsyncClusterResponse response) {
StandardAsyncClusterResponse response, boolean executionPhase) {
// state validation
Objects.requireNonNull(nodeIds);
@ -355,6 +370,11 @@ public class ThreadPoolRequestReplicator implements RequestReplicator {
finalResponse.add(nodeResponse);
};
// instruct the node to actually perform the underlying action
if (mutableRequest && executionPhase) {
updatedHeaders.put(REQUEST_EXECUTION_HTTP_HEADER, "true");
}
// replicate the request to all nodes
final Function<NodeIdentifier, NodeHttpRequest> requestFactory =
nodeId -> new NodeHttpRequest(nodeId, method, createURI(uri, nodeId), entity, updatedHeaders, nodeCompletionCallback);
@ -368,12 +388,8 @@ public class ThreadPoolRequestReplicator implements RequestReplicator {
private void performVerification(Set<NodeIdentifier> nodeIds, String method, URI uri, Object entity, Map<String, String> headers, StandardAsyncClusterResponse clusterResponse) {
logger.debug("Verifying that mutable request {} {} can be made", method, uri.getPath());
// Add the Lock Version ID to the headers so that it is used in all requests for this transaction
final String lockVersionId = UUID.randomUUID().toString();
headers.put(RequestReplicator.LOCK_VERSION_ID_HEADER, lockVersionId);
final Map<String, String> updatedHeaders = new HashMap<>(headers);
updatedHeaders.put(REQUEST_VALIDATION_HTTP_HEADER, NODE_CONTINUE);
final Map<String, String> validationHeaders = new HashMap<>(headers);
validationHeaders.put(REQUEST_VALIDATION_HTTP_HEADER, NODE_CONTINUE);
final int numNodes = nodeIds.size();
final NodeRequestCompletionCallback completionCallback = new NodeRequestCompletionCallback() {
@ -404,12 +420,12 @@ public class ThreadPoolRequestReplicator implements RequestReplicator {
// to all nodes and we are finished.
if (dissentingCount == 0) {
logger.debug("Received verification from all {} nodes that mutable request {} {} can be made", numNodes, method, uri.getPath());
replicate(nodeIds, method, uri, entity, headers, false, clusterResponse);
replicate(nodeIds, method, uri, entity, headers, false, clusterResponse, true);
return;
}
final Map<String, String> cancelLockHeaders = new HashMap<>(updatedHeaders);
cancelLockHeaders.put(LOCK_CANCELATION_HEADER, "true");
final Map<String, String> cancelLockHeaders = new HashMap<>(headers);
cancelLockHeaders.put(REQUEST_TRANSACTION_CANCELATION_HTTP_HEADER, "true");
final Thread cancelLockThread = new Thread(new Runnable() {
@Override
public void run() {
@ -482,10 +498,10 @@ public class ThreadPoolRequestReplicator implements RequestReplicator {
};
// Callback function for generating a NodeHttpRequestCallable that can be used to perform the work
final Function<NodeIdentifier, NodeHttpRequest> requestFactory = nodeId -> new NodeHttpRequest(nodeId, method, createURI(uri, nodeId), entity, updatedHeaders, completionCallback);
final Function<NodeIdentifier, NodeHttpRequest> requestFactory = nodeId -> new NodeHttpRequest(nodeId, method, createURI(uri, nodeId), entity, validationHeaders, completionCallback);
// replicate the 'verification request' to all nodes
replicateRequest(nodeIds, uri.getScheme(), uri.getPath(), requestFactory, updatedHeaders);
replicateRequest(nodeIds, uri.getScheme(), uri.getPath(), requestFactory, validationHeaders);
}
@ -500,9 +516,11 @@ public class ThreadPoolRequestReplicator implements RequestReplicator {
}
// Visible for testing - overriding this method makes it easy to verify behavior without actually making any web requests
protected NodeResponse replicateRequest(final WebResource.Builder resourceBuilder, final NodeIdentifier nodeId, final String method, final URI uri, final String requestId) {
protected NodeResponse replicateRequest(final WebResource.Builder resourceBuilder, final NodeIdentifier nodeId, final String method, final URI uri, final String requestId,
final Map<String, String> headers) {
final ClientResponse clientResponse;
final long startNanos = System.nanoTime();
logger.debug("Replicating request to {} {}, request ID = {}, headers = {}", method, uri, requestId, headers);
switch (method.toUpperCase()) {
case HttpMethod.DELETE:
@ -703,7 +721,7 @@ public class ThreadPoolRequestReplicator implements RequestReplicator {
final String requestId = headers.get("x-nifi-request-id");
logger.debug("Replicating request {} {} to {}", method, uri.getPath(), nodeId);
nodeResponse = replicateRequest(resourceBuilder, nodeId, method, uri, requestId);
nodeResponse = replicateRequest(resourceBuilder, nodeId, method, uri, requestId, headers);
} catch (final Exception e) {
nodeResponse = new NodeResponse(nodeId, method, uri, e);
logger.warn("Failed to replicate request {} {} to {} due to {}", method, uri.getPath(), nodeId, e);

View File

@ -164,7 +164,8 @@ public class TestThreadPoolRequestReplicator {
final ThreadPoolRequestReplicator replicator
= new ThreadPoolRequestReplicator(2, new Client(), coordinator, "1 sec", "1 sec", null, null, NiFiProperties.createBasicNiFiProperties(null, null)) {
@Override
protected NodeResponse replicateRequest(final WebResource.Builder resourceBuilder, final NodeIdentifier nodeId, final String method, final URI uri, final String requestId) {
protected NodeResponse replicateRequest(final WebResource.Builder resourceBuilder, final NodeIdentifier nodeId, final String method,
final URI uri, final String requestId, Map<String, String> givenHeaders) {
// the resource builder will not expose its headers to us, so we are using Mockito's Whitebox class to extract them.
final OutBoundHeaders headers = (OutBoundHeaders) Whitebox.getInternalState(resourceBuilder, "metadata");
final Object expectsHeader = headers.getFirst(ThreadPoolRequestReplicator.REQUEST_VALIDATION_HTTP_HEADER);
@ -285,7 +286,8 @@ public class TestThreadPoolRequestReplicator {
final ThreadPoolRequestReplicator replicator
= new ThreadPoolRequestReplicator(2, new Client(), coordinator, "1 sec", "1 sec", null, null, NiFiProperties.createBasicNiFiProperties(null, null)) {
@Override
protected NodeResponse replicateRequest(final WebResource.Builder resourceBuilder, final NodeIdentifier nodeId, final String method, final URI uri, final String requestId) {
protected NodeResponse replicateRequest(final WebResource.Builder resourceBuilder, final NodeIdentifier nodeId, final String method,
final URI uri, final String requestId, Map<String, String> givenHeaders) {
// the resource builder will not expose its headers to us, so we are using Mockito's Whitebox class to extract them.
final OutBoundHeaders headers = (OutBoundHeaders) Whitebox.getInternalState(resourceBuilder, "metadata");
final Object expectsHeader = headers.getFirst(ThreadPoolRequestReplicator.REQUEST_VALIDATION_HTTP_HEADER);
@ -327,7 +329,8 @@ public class TestThreadPoolRequestReplicator {
final ThreadPoolRequestReplicator replicator
= new ThreadPoolRequestReplicator(2, new Client(), coordinator, "1 sec", "1 sec", null, null, NiFiProperties.createBasicNiFiProperties(null, null)) {
@Override
protected NodeResponse replicateRequest(final WebResource.Builder resourceBuilder, final NodeIdentifier nodeId, final String method, final URI uri, final String requestId) {
protected NodeResponse replicateRequest(final WebResource.Builder resourceBuilder, final NodeIdentifier nodeId, final String method,
final URI uri, final String requestId, Map<String, String> givenHeaders) {
if (delayMillis > 0L) {
try {
Thread.sleep(delayMillis);

View File

@ -170,7 +170,7 @@ public class AccessPolicyResource extends ApplicationResource {
* Creates a new access policy.
*
* @param httpServletRequest request
* @param accessPolicyEntity An accessPolicyEntity.
* @param requestAccessPolicyEntity An accessPolicyEntity.
* @return An accessPolicyEntity.
*/
@POST
@ -197,22 +197,22 @@ public class AccessPolicyResource extends ApplicationResource {
@ApiParam(
value = "The access policy configuration details.",
required = true
) final AccessPolicyEntity accessPolicyEntity) {
) final AccessPolicyEntity requestAccessPolicyEntity) {
// ensure we're running with a configurable authorizer
if (!(authorizer instanceof AbstractPolicyBasedAuthorizer)) {
throw new IllegalStateException(AccessPolicyDAO.MSG_NON_ABSTRACT_POLICY_BASED_AUTHORIZER);
}
if (accessPolicyEntity == null || accessPolicyEntity.getComponent() == null) {
if (requestAccessPolicyEntity == null || requestAccessPolicyEntity.getComponent() == null) {
throw new IllegalArgumentException("Access policy details must be specified.");
}
if (accessPolicyEntity.getRevision() == null || (accessPolicyEntity.getRevision().getVersion() == null || accessPolicyEntity.getRevision().getVersion() != 0)) {
if (requestAccessPolicyEntity.getRevision() == null || (requestAccessPolicyEntity.getRevision().getVersion() == null || requestAccessPolicyEntity.getRevision().getVersion() != 0)) {
throw new IllegalArgumentException("A revision of 0 must be specified when creating a new Policy.");
}
final AccessPolicyDTO requestAccessPolicy = accessPolicyEntity.getComponent();
final AccessPolicyDTO requestAccessPolicy = requestAccessPolicyEntity.getComponent();
if (requestAccessPolicy.getId() != null) {
throw new IllegalArgumentException("Access policy ID cannot be specified.");
}
@ -225,24 +225,23 @@ public class AccessPolicyResource extends ApplicationResource {
RequestAction.valueOfValue(requestAccessPolicy.getAction());
if (isReplicateRequest()) {
return replicate(HttpMethod.POST, accessPolicyEntity);
return replicate(HttpMethod.POST, requestAccessPolicyEntity);
}
// handle expects request (usually from the cluster manager)
final boolean validationPhase = isValidationPhase(httpServletRequest);
if (validationPhase || !isTwoPhaseRequest(httpServletRequest)) {
// authorize access
serviceFacade.authorizeAccess(lookup -> {
return withWriteLock(
serviceFacade,
requestAccessPolicyEntity,
lookup -> {
final Authorizable accessPolicies = lookup.getAccessPolicyByResource(requestAccessPolicy.getResource());
accessPolicies.authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser());
});
}
if (validationPhase) {
return generateContinueResponse().build();
}
},
null,
accessPolicyEntity -> {
final AccessPolicyDTO accessPolicy = accessPolicyEntity.getComponent();
// set the access policy id as appropriate
requestAccessPolicy.setId(generateUuid());
accessPolicy.setId(generateUuid());
// get revision from the config
final RevisionDTO revisionDTO = accessPolicyEntity.getRevision();
@ -255,6 +254,8 @@ public class AccessPolicyResource extends ApplicationResource {
// build the response
return clusterContext(generateCreatedResponse(URI.create(entity.getUri()), entity)).build();
}
);
}
/**
* Retrieves the specified access policy.
@ -316,7 +317,7 @@ public class AccessPolicyResource extends ApplicationResource {
*
* @param httpServletRequest request
* @param id The id of the access policy to update.
* @param accessPolicyEntity An accessPolicyEntity.
* @param requestAccessPolicyEntity An accessPolicyEntity.
* @return An accessPolicyEntity.
*/
@PUT
@ -349,43 +350,46 @@ public class AccessPolicyResource extends ApplicationResource {
@ApiParam(
value = "The access policy configuration details.",
required = true
) final AccessPolicyEntity accessPolicyEntity) {
) final AccessPolicyEntity requestAccessPolicyEntity) {
// ensure we're running with a configurable authorizer
if (!(authorizer instanceof AbstractPolicyBasedAuthorizer)) {
throw new IllegalStateException(AccessPolicyDAO.MSG_NON_ABSTRACT_POLICY_BASED_AUTHORIZER);
}
if (accessPolicyEntity == null || accessPolicyEntity.getComponent() == null) {
if (requestAccessPolicyEntity == null || requestAccessPolicyEntity.getComponent() == null) {
throw new IllegalArgumentException("Access policy details must be specified.");
}
if (accessPolicyEntity.getRevision() == null) {
if (requestAccessPolicyEntity.getRevision() == null) {
throw new IllegalArgumentException("Revision must be specified.");
}
// ensure the ids are the same
final AccessPolicyDTO accessPolicyDTO = accessPolicyEntity.getComponent();
if (!id.equals(accessPolicyDTO.getId())) {
final AccessPolicyDTO requestAccessPolicyDTO = requestAccessPolicyEntity.getComponent();
if (!id.equals(requestAccessPolicyDTO.getId())) {
throw new IllegalArgumentException(String.format("The access policy id (%s) in the request body does not equal the "
+ "access policy id of the requested resource (%s).", accessPolicyDTO.getId(), id));
+ "access policy id of the requested resource (%s).", requestAccessPolicyDTO.getId(), id));
}
if (isReplicateRequest()) {
return replicate(HttpMethod.PUT, accessPolicyEntity);
return replicate(HttpMethod.PUT, requestAccessPolicyEntity);
}
// Extract the revision
final Revision revision = getRevision(accessPolicyEntity, id);
final Revision requestRevision = getRevision(requestAccessPolicyEntity, id);
return withWriteLock(
serviceFacade,
revision,
requestAccessPolicyEntity,
requestRevision,
lookup -> {
Authorizable authorizable = lookup.getAccessPolicyById(id);
authorizable.authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser());
},
null,
() -> {
(revision, accessPolicyEntity) -> {
final AccessPolicyDTO accessPolicyDTO = accessPolicyEntity.getComponent();
// update the access policy
final AccessPolicyEntity entity = serviceFacade.updateAccessPolicy(revision, accessPolicyDTO);
populateRemainingAccessPolicyEntityContent(entity);
@ -454,20 +458,23 @@ public class AccessPolicyResource extends ApplicationResource {
return replicate(HttpMethod.DELETE);
}
final AccessPolicyEntity requestAccessPolicyEntity = new AccessPolicyEntity();
requestAccessPolicyEntity.setId(id);
// handle expects request (usually from the cluster manager)
final Revision revision = new Revision(version == null ? null : version.getLong(), clientId.getClientId(), id);
final Revision requestRevision = new Revision(version == null ? null : version.getLong(), clientId.getClientId(), id);
return withWriteLock(
serviceFacade,
revision,
requestAccessPolicyEntity,
requestRevision,
lookup -> {
final Authorizable accessPolicy = lookup.getAccessPolicyById(id);
accessPolicy.authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser());
},
() -> {
},
() -> {
null,
(revision, accessPolicyEntity) -> {
// delete the specified access policy
final AccessPolicyEntity entity = serviceFacade.deleteAccessPolicy(revision, id);
final AccessPolicyEntity entity = serviceFacade.deleteAccessPolicy(revision, accessPolicyEntity.getId());
return clusterContext(generateOkResponse(entity)).build();
}
);

View File

@ -16,6 +16,8 @@
*/
package org.apache.nifi.web.api;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.sun.jersey.api.core.HttpContext;
import com.sun.jersey.api.representation.Form;
import com.sun.jersey.core.util.MultivaluedMapImpl;
@ -49,7 +51,10 @@ import org.apache.nifi.web.Revision;
import org.apache.nifi.web.api.dto.RevisionDTO;
import org.apache.nifi.web.api.dto.SnippetDTO;
import org.apache.nifi.web.api.entity.ComponentEntity;
import org.apache.nifi.web.api.entity.Entity;
import org.apache.nifi.web.api.entity.TransactionResultEntity;
import org.apache.nifi.web.security.ProxiedEntitiesUtils;
import org.apache.nifi.web.security.util.CacheKey;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -75,8 +80,10 @@ import java.util.Optional;
import java.util.Set;
import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.function.Function;
import static javax.ws.rs.core.Response.Status.NOT_FOUND;
import static org.apache.commons.lang3.StringUtils.isEmpty;
@ -114,6 +121,8 @@ public abstract class ApplicationResource {
private RequestReplicator requestReplicator;
private ClusterCoordinator clusterCoordinator;
private static final int MAX_CACHE_SOFT_LIMIT = 500;
private final Cache<CacheKey, Request<? extends Entity>> twoPhaseCommitCache = CacheBuilder.newBuilder().expireAfterWrite(1, TimeUnit.MINUTES).build();
/**
* Generate a resource uri based off of the specified parameters.
@ -348,8 +357,8 @@ public abstract class ApplicationResource {
* @return <code>true</code> if the request represents a two-phase commit style request
*/
protected boolean isTwoPhaseRequest(final HttpServletRequest httpServletRequest) {
final String headerValue = httpServletRequest.getHeader(RequestReplicator.REQUEST_TRANSACTION_ID_HEADER);
return headerValue != null;
final String transactionId = httpServletRequest.getHeader(RequestReplicator.REQUEST_TRANSACTION_ID_HEADER);
return transactionId != null && isConnectedToCluster();
}
/**
@ -366,6 +375,14 @@ public abstract class ApplicationResource {
return isTwoPhaseRequest(httpServletRequest) && httpServletRequest.getHeader(RequestReplicator.REQUEST_VALIDATION_HTTP_HEADER) != null;
}
protected boolean isExecutionPhase(final HttpServletRequest httpServletRequest) {
return isTwoPhaseRequest(httpServletRequest) && httpServletRequest.getHeader(RequestReplicator.REQUEST_EXECUTION_HTTP_HEADER) != null;
}
protected boolean isCancellationPhase(final HttpServletRequest httpServletRequest) {
return isTwoPhaseRequest(httpServletRequest) && httpServletRequest.getHeader(RequestReplicator.REQUEST_TRANSACTION_CANCELATION_HTTP_HEADER) != null;
}
/**
* Checks whether or not the request should be replicated to the cluster
*
@ -377,6 +394,7 @@ public abstract class ApplicationResource {
return false;
}
// If not connected to the cluster, we do not replicate
if (!isConnectedToCluster()) {
return false;
}
@ -468,12 +486,43 @@ public abstract class ApplicationResource {
* @param action executor
* @return the response
*/
protected Response withWriteLock(final NiFiServiceFacade serviceFacade, final Revision revision, final AuthorizeAccess authorizer,
final Runnable verifier, final Supplier<Response> action) {
protected <T extends Entity> Response withWriteLock(final NiFiServiceFacade serviceFacade, final T entity, final Revision revision, final AuthorizeAccess authorizer,
final Runnable verifier, final BiFunction<Revision, T, Response> action) {
final NiFiUser user = NiFiUserUtils.getNiFiUser();
return withWriteLock(serviceFacade, authorizer, verifier, action,
() -> serviceFacade.verifyRevision(revision, user));
if (isTwoPhaseRequest(httpServletRequest)) {
if (isValidationPhase(httpServletRequest)) {
// authorize access
serviceFacade.authorizeAccess(authorizer);
serviceFacade.verifyRevision(revision, user);
// verify if necessary
if (verifier != null) {
verifier.run();
}
// store the request
phaseOneStoreTransaction(entity, revision, null);
return generateContinueResponse().build();
} else if (isExecutionPhase(httpServletRequest)) {
// get the original request and run the action
final Request<T> phaseOneRequest = phaseTwoVerifyTransaction();
return action.apply(phaseOneRequest.getRevision(), phaseOneRequest.getRequest());
} else if (isCancellationPhase(httpServletRequest)) {
cancelTransaction();
return generateOkResponse().build();
} else {
throw new IllegalStateException("This request does not appear to be part of the two phase commit.");
}
} else {
// authorize access and run the action
serviceFacade.authorizeAccess(authorizer);
serviceFacade.verifyRevision(revision, user);
return action.apply(revision, entity);
}
}
/**
@ -486,43 +535,197 @@ public abstract class ApplicationResource {
* @param action executor
* @return the response
*/
protected Response withWriteLock(final NiFiServiceFacade serviceFacade, final Set<Revision> revisions, final AuthorizeAccess authorizer,
final Runnable verifier, final Supplier<Response> action) {
protected <T extends Entity> Response withWriteLock(final NiFiServiceFacade serviceFacade, final T entity, final Set<Revision> revisions, final AuthorizeAccess authorizer,
final Runnable verifier, final BiFunction<Set<Revision>, T, Response> action) {
final NiFiUser user = NiFiUserUtils.getNiFiUser();
return withWriteLock(serviceFacade, authorizer, verifier, action,
() -> serviceFacade.verifyRevisions(revisions, user));
if (isTwoPhaseRequest(httpServletRequest)) {
if (isValidationPhase(httpServletRequest)) {
// authorize access
serviceFacade.authorizeAccess(authorizer);
serviceFacade.verifyRevisions(revisions, user);
// verify if necessary
if (verifier != null) {
verifier.run();
}
// store the request
phaseOneStoreTransaction(entity, null, revisions);
return generateContinueResponse().build();
} else if (isExecutionPhase(httpServletRequest)) {
// get the original request and run the action
final Request<T> phaseOneRequest = phaseTwoVerifyTransaction();
return action.apply(phaseOneRequest.getRevisions(), phaseOneRequest.getRequest());
} else if (isCancellationPhase(httpServletRequest)) {
cancelTransaction();
return generateOkResponse().build();
} else {
throw new IllegalStateException("This request does not appear to be part of the two phase commit.");
}
} else {
// authorize access and run the action
serviceFacade.authorizeAccess(authorizer);
serviceFacade.verifyRevisions(revisions, user);
return action.apply(revisions, entity);
}
}
/**
* Executes an action through the service facade using the specified revision.
* Executes an action through the service facade.
*
* @param serviceFacade service facade
* @param authorizer authorizer
* @param verifier verifier
* @param action the action to execute
* @param verifyRevision a callback that will claim the necessary revisions for the operation
* @return the response
*/
private Response withWriteLock(
final NiFiServiceFacade serviceFacade, final AuthorizeAccess authorizer, final Runnable verifier, final Supplier<Response> action,
final Runnable verifyRevision) {
protected <T extends Entity> Response withWriteLock(final NiFiServiceFacade serviceFacade, final T entity, final AuthorizeAccess authorizer,
final Runnable verifier, final Function<T, Response> action) {
final boolean validationPhase = isValidationPhase(httpServletRequest);
if (validationPhase || !isTwoPhaseRequest(httpServletRequest)) {
if (isTwoPhaseRequest(httpServletRequest)) {
if (isValidationPhase(httpServletRequest)) {
// authorize access
serviceFacade.authorizeAccess(authorizer);
verifyRevision.run();
}
if (validationPhase) {
// verify if necessary
if (verifier != null) {
verifier.run();
}
// store the request
phaseOneStoreTransaction(entity, null, null);
return generateContinueResponse().build();
} else if (isExecutionPhase(httpServletRequest)) {
// get the original request and run the action
final Request<T> phaseOneRequest = phaseTwoVerifyTransaction();
return action.apply(phaseOneRequest.getRequest());
} else if (isCancellationPhase(httpServletRequest)) {
cancelTransaction();
return generateOkResponse().build();
} else {
throw new IllegalStateException("This request does not appear to be part of the two phase commit.");
}
} else {
// authorize access
serviceFacade.authorizeAccess(authorizer);
// run the action
return action.apply(entity);
}
}
return action.get();
private <T extends Entity> void phaseOneStoreTransaction(final T requestEntity, final Revision revision, final Set<Revision> revisions) {
if (twoPhaseCommitCache.size() > MAX_CACHE_SOFT_LIMIT) {
throw new IllegalStateException("The maximum number of requests are in progress.");
}
// get the transaction id
final String transactionId = httpServletRequest.getHeader(RequestReplicator.REQUEST_TRANSACTION_ID_HEADER);
if (StringUtils.isBlank(transactionId)) {
throw new IllegalArgumentException("Two phase commit Transaction Id missing.");
}
synchronized (twoPhaseCommitCache) {
final CacheKey key = new CacheKey(transactionId);
if (twoPhaseCommitCache.getIfPresent(key) != null) {
throw new IllegalStateException("Transaction " + transactionId + " is already in progress.");
}
// store the entry for the second phase
final NiFiUser user = NiFiUserUtils.getNiFiUser();
final Request<T> request = new Request<>(ProxiedEntitiesUtils.buildProxiedEntitiesChainString(user), getAbsolutePath().toString(), revision, revisions, requestEntity);
twoPhaseCommitCache.put(key, request);
}
}
private <T extends Entity> Request<T> phaseTwoVerifyTransaction() {
// get the transaction id
final String transactionId = httpServletRequest.getHeader(RequestReplicator.REQUEST_TRANSACTION_ID_HEADER);
if (StringUtils.isBlank(transactionId)) {
throw new IllegalArgumentException("Two phase commit Transaction Id missing.");
}
// get the entry for the second phase
final Request<T> request;
synchronized (twoPhaseCommitCache) {
final CacheKey key = new CacheKey(transactionId);
request = (Request<T>) twoPhaseCommitCache.getIfPresent(key);
if (request == null) {
throw new IllegalArgumentException("The request from phase one is missing.");
}
twoPhaseCommitCache.invalidate(key);
}
final String phaseOneChain = request.getUserChain();
// build the chain for the current request
final NiFiUser user = NiFiUserUtils.getNiFiUser();
final String phaseTwoChain = ProxiedEntitiesUtils.buildProxiedEntitiesChainString(user);
if (phaseOneChain == null || !phaseOneChain.equals(phaseTwoChain)) {
throw new IllegalArgumentException("The same user must issue the request for phase one and two.");
}
final String phaseOneUri = request.getUri();
if (phaseOneUri == null || !phaseOneUri.equals(getAbsolutePath().toString())) {
throw new IllegalArgumentException("The URI must be the same for phase one and two.");
}
return request;
}
private void cancelTransaction() {
// get the transaction id
final String transactionId = httpServletRequest.getHeader(RequestReplicator.REQUEST_TRANSACTION_ID_HEADER);
if (StringUtils.isBlank(transactionId)) {
throw new IllegalArgumentException("Two phase commit Transaction Id missing.");
}
synchronized (twoPhaseCommitCache) {
final CacheKey key = new CacheKey(transactionId);
twoPhaseCommitCache.invalidate(key);
}
}
private final class Request<T extends Entity> {
final String userChain;
final String uri;
final Revision revision;
final Set<Revision> revisions;
final T request;
public Request(String userChain, String uri, Revision revision, Set<Revision> revisions, T request) {
this.userChain = userChain;
this.uri = uri;
this.revision = revision;
this.revisions = revisions;
this.request = request;
}
public String getUserChain() {
return userChain;
}
public String getUri() {
return uri;
}
public Revision getRevision() {
return revision;
}
public Set<Revision> getRevisions() {
return revisions;
}
public T getRequest() {
return request;
}
}
/**
@ -713,7 +916,7 @@ public abstract class ApplicationResource {
if (getReplicationTarget() == ReplicationTarget.CLUSTER_NODES) {
return requestReplicator.replicate(method, path, entity, headers).awaitMergedResponse();
} else {
return requestReplicator.replicate(Collections.singleton(getClusterCoordinatorNode()), method, path, entity, headers, false, true).awaitMergedResponse();
return requestReplicator.forwardToCoordinator(getClusterCoordinatorNode(), method, path, entity, headers).awaitMergedResponse();
}
}

View File

@ -148,7 +148,7 @@ public class ConnectionResource extends ApplicationResource {
*
* @param httpServletRequest request
* @param id The id of the connection.
* @param connectionEntity A connectionEntity.
* @param requestConnectionEntity A connectionEntity.
* @return A connectionEntity.
* @throws InterruptedException if interrupted
*/
@ -185,36 +185,37 @@ public class ConnectionResource extends ApplicationResource {
@ApiParam(
value = "The connection configuration details.",
required = true
) final ConnectionEntity connectionEntity) throws InterruptedException {
) final ConnectionEntity requestConnectionEntity) throws InterruptedException {
if (connectionEntity == null || connectionEntity.getComponent() == null) {
if (requestConnectionEntity == null || requestConnectionEntity.getComponent() == null) {
throw new IllegalArgumentException("Connection details must be specified.");
}
if (connectionEntity.getRevision() == null) {
if (requestConnectionEntity.getRevision() == null) {
throw new IllegalArgumentException("Revision must be specified.");
}
// ensure the ids are the same
final ConnectionDTO connection = connectionEntity.getComponent();
if (!id.equals(connection.getId())) {
final ConnectionDTO requestConnection = requestConnectionEntity.getComponent();
if (!id.equals(requestConnection.getId())) {
throw new IllegalArgumentException(String.format("The connection id "
+ "(%s) in the request body does not equal the connection id of the "
+ "requested resource (%s).", connection.getId(), id));
+ "requested resource (%s).", requestConnection.getId(), id));
}
if (connection.getDestination() != null && connection.getDestination().getId() == null) {
if (requestConnection.getDestination() != null && requestConnection.getDestination().getId() == null) {
throw new IllegalArgumentException("When specifying a destination component, the destination id is required.");
}
if (isReplicateRequest()) {
return replicate(HttpMethod.PUT, connectionEntity);
return replicate(HttpMethod.PUT, requestConnectionEntity);
}
final Revision revision = getRevision(connectionEntity, id);
final Revision requestRevision = getRevision(requestConnectionEntity, id);
return withWriteLock(
serviceFacade,
revision,
requestConnectionEntity,
requestRevision,
lookup -> {
// verifies write access to this connection (this checks the current source and destination)
ConnectionAuthorizable connAuth = lookup.getConnection(id);
@ -222,17 +223,19 @@ public class ConnectionResource extends ApplicationResource {
// if a destination has been specified and is different
final Connectable currentDestination = connAuth.getDestination();
if (connection.getDestination() != null && currentDestination.getIdentifier().equals(connection.getDestination().getId())) {
if (requestConnection.getDestination() != null && currentDestination.getIdentifier().equals(requestConnection.getDestination().getId())) {
// verify access of the new destination (current destination was already authorized as part of the connection check)
final Authorizable newDestinationAuthorizable = lookup.getConnectable(connection.getDestination().getId());
final Authorizable newDestinationAuthorizable = lookup.getConnectable(requestConnection.getDestination().getId());
newDestinationAuthorizable.authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser());
// verify access of the parent group (this is the same check that is performed when creating the connection)
connAuth.getParentGroup().authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser());
}
},
() -> serviceFacade.verifyUpdateConnection(connection),
() -> {
() -> serviceFacade.verifyUpdateConnection(requestConnection),
(revision, connectionEntity) -> {
final ConnectionDTO connection = connectionEntity.getComponent();
final ConnectionEntity entity = serviceFacade.updateConnection(revision, connection);
populateRemainingConnectionEntityContent(entity);
@ -296,21 +299,25 @@ public class ConnectionResource extends ApplicationResource {
// determine the specified version
final Long clientVersion = version == null ? null : version.getLong();
final Revision revision = new Revision(clientVersion, clientId.getClientId(), id);
final Revision requestRevision = new Revision(clientVersion, clientId.getClientId(), id);
final ConnectionEntity requestConnectionEntity = new ConnectionEntity();
requestConnectionEntity.setId(id);
// get the current user
return withWriteLock(
serviceFacade,
revision,
requestConnectionEntity,
requestRevision,
lookup -> {
// verifies write access to the source and destination
final Authorizable authorizable = lookup.getConnection(id).getAuthorizable();
authorizable.authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser());
},
() -> serviceFacade.verifyDeleteConnection(id),
() -> {
(revision, connectionEntity) -> {
// delete the connection
final ConnectionEntity entity = serviceFacade.deleteConnection(revision, id);
final ConnectionEntity entity = serviceFacade.deleteConnection(revision, connectionEntity.getId());
// generate the response
return clusterContext(generateOkResponse(entity)).build();

View File

@ -47,6 +47,7 @@ import org.apache.nifi.web.api.dto.ReportingTaskDTO;
import org.apache.nifi.web.api.entity.ClusterEntity;
import org.apache.nifi.web.api.entity.ControllerConfigurationEntity;
import org.apache.nifi.web.api.entity.ControllerServiceEntity;
import org.apache.nifi.web.api.entity.Entity;
import org.apache.nifi.web.api.entity.HistoryEntity;
import org.apache.nifi.web.api.entity.NodeEntity;
import org.apache.nifi.web.api.entity.ReportingTaskEntity;
@ -67,6 +68,7 @@ import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import java.net.URI;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
@ -159,7 +161,7 @@ public class ControllerResource extends ApplicationResource {
* Update the configuration for this NiFi.
*
* @param httpServletRequest request
* @param configEntity A controllerConfigurationEntity.
* @param requestConfigEntity A controllerConfigurationEntity.
* @return A controllerConfigurationEntity.
*/
@PUT
@ -186,29 +188,30 @@ public class ControllerResource extends ApplicationResource {
@ApiParam(
value = "The controller configuration.",
required = true
) final ControllerConfigurationEntity configEntity) {
) final ControllerConfigurationEntity requestConfigEntity) {
if (configEntity == null || configEntity.getComponent() == null) {
if (requestConfigEntity == null || requestConfigEntity.getComponent() == null) {
throw new IllegalArgumentException("Controller configuration must be specified");
}
if (configEntity.getRevision() == null) {
if (requestConfigEntity.getRevision() == null) {
throw new IllegalArgumentException("Revision must be specified.");
}
if (isReplicateRequest()) {
return replicate(HttpMethod.PUT, configEntity);
return replicate(HttpMethod.PUT, requestConfigEntity);
}
final Revision revision = getRevision(configEntity.getRevision(), FlowController.class.getSimpleName());
final Revision requestRevision = getRevision(requestConfigEntity.getRevision(), FlowController.class.getSimpleName());
return withWriteLock(
serviceFacade,
revision,
requestConfigEntity,
requestRevision,
lookup -> {
authorizeController(RequestAction.WRITE);
},
null,
() -> {
(revision, configEntity) -> {
final ControllerConfigurationEntity entity = serviceFacade.updateControllerConfiguration(revision, configEntity.getComponent());
return clusterContext(generateOkResponse(entity)).build();
}
@ -223,7 +226,7 @@ public class ControllerResource extends ApplicationResource {
* Creates a new Reporting Task.
*
* @param httpServletRequest request
* @param reportingTaskEntity A reportingTaskEntity.
* @param requestReportingTaskEntity A reportingTaskEntity.
* @return A reportingTaskEntity.
*/
@POST
@ -251,17 +254,17 @@ public class ControllerResource extends ApplicationResource {
@ApiParam(
value = "The reporting task configuration details.",
required = true
) final ReportingTaskEntity reportingTaskEntity) {
) final ReportingTaskEntity requestReportingTaskEntity) {
if (reportingTaskEntity == null || reportingTaskEntity.getComponent() == null) {
if (requestReportingTaskEntity == null || requestReportingTaskEntity.getComponent() == null) {
throw new IllegalArgumentException("Reporting task details must be specified.");
}
if (reportingTaskEntity.getRevision() == null || (reportingTaskEntity.getRevision().getVersion() == null || reportingTaskEntity.getRevision().getVersion() != 0)) {
if (requestReportingTaskEntity.getRevision() == null || (requestReportingTaskEntity.getRevision().getVersion() == null || requestReportingTaskEntity.getRevision().getVersion() != 0)) {
throw new IllegalArgumentException("A revision of 0 must be specified when creating a new Reporting task.");
}
final ReportingTaskDTO requestReportingTask = reportingTaskEntity.getComponent();
final ReportingTaskDTO requestReportingTask = requestReportingTaskEntity.getComponent();
if (requestReportingTask.getId() != null) {
throw new IllegalArgumentException("Reporting task ID cannot be specified.");
}
@ -271,37 +274,37 @@ public class ControllerResource extends ApplicationResource {
}
if (isReplicateRequest()) {
return replicate(HttpMethod.POST, reportingTaskEntity);
return replicate(HttpMethod.POST, requestReportingTaskEntity);
}
// handle expects request (usually from the cluster manager)
final boolean validationPhase = isValidationPhase(httpServletRequest);
if (validationPhase || !isTwoPhaseRequest(httpServletRequest)) {
// authorize access
serviceFacade.authorizeAccess(lookup -> {
return withWriteLock(
serviceFacade,
requestReportingTaskEntity,
lookup -> {
authorizeController(RequestAction.WRITE);
if (requestReportingTask.getProperties() != null) {
final ControllerServiceReferencingComponentAuthorizable authorizable = lookup.getReportingTaskByType(requestReportingTask.getType());
AuthorizeControllerServiceReference.authorizeControllerServiceReferences(requestReportingTask.getProperties(), authorizable, authorizer, lookup);
}
});
}
if (validationPhase) {
return generateContinueResponse().build();
}
},
null,
(reportingTaskEntity) -> {
final ReportingTaskDTO reportingTask = reportingTaskEntity.getComponent();
// set the processor id as appropriate
requestReportingTask.setId(generateUuid());
reportingTask.setId(generateUuid());
// create the reporting task and generate the json
final Revision revision = getRevision(reportingTaskEntity, requestReportingTask.getId());
final ReportingTaskEntity entity = serviceFacade.createReportingTask(revision, requestReportingTask);
final Revision revision = getRevision(reportingTaskEntity, reportingTask.getId());
final ReportingTaskEntity entity = serviceFacade.createReportingTask(revision, reportingTask);
reportingTaskResource.populateRemainingReportingTaskEntityContent(entity);
// build the response
return clusterContext(generateCreatedResponse(URI.create(entity.getUri()), entity)).build();
}
);
}
// -------------------
// controller services
@ -311,7 +314,7 @@ public class ControllerResource extends ApplicationResource {
* Creates a new Controller Service.
*
* @param httpServletRequest request
* @param controllerServiceEntity A controllerServiceEntity.
* @param requestControllerServiceEntity A controllerServiceEntity.
* @return A controllerServiceEntity.
*/
@POST
@ -339,17 +342,17 @@ public class ControllerResource extends ApplicationResource {
@ApiParam(
value = "The controller service configuration details.",
required = true
) final ControllerServiceEntity controllerServiceEntity) {
) final ControllerServiceEntity requestControllerServiceEntity) {
if (controllerServiceEntity == null || controllerServiceEntity.getComponent() == null) {
if (requestControllerServiceEntity == null || requestControllerServiceEntity.getComponent() == null) {
throw new IllegalArgumentException("Controller service details must be specified.");
}
if (controllerServiceEntity.getRevision() == null || (controllerServiceEntity.getRevision().getVersion() == null || controllerServiceEntity.getRevision().getVersion() != 0)) {
if (requestControllerServiceEntity.getRevision() == null || (requestControllerServiceEntity.getRevision().getVersion() == null || requestControllerServiceEntity.getRevision().getVersion() != 0)) {
throw new IllegalArgumentException("A revision of 0 must be specified when creating a new Controller service.");
}
final ControllerServiceDTO requestControllerService = controllerServiceEntity.getComponent();
final ControllerServiceDTO requestControllerService = requestControllerServiceEntity.getComponent();
if (requestControllerService.getId() != null) {
throw new IllegalArgumentException("Controller service ID cannot be specified.");
}
@ -363,37 +366,37 @@ public class ControllerResource extends ApplicationResource {
}
if (isReplicateRequest()) {
return replicate(HttpMethod.POST, controllerServiceEntity);
return replicate(HttpMethod.POST, requestControllerServiceEntity);
}
// handle expects request (usually from the cluster manager)
final boolean validationPhase = isValidationPhase(httpServletRequest);
if (validationPhase || !isTwoPhaseRequest(httpServletRequest)) {
// authorize access
serviceFacade.authorizeAccess(lookup -> {
return withWriteLock(
serviceFacade,
requestControllerServiceEntity,
lookup -> {
authorizeController(RequestAction.WRITE);
if (requestControllerService.getProperties() != null) {
final ControllerServiceReferencingComponentAuthorizable authorizable = lookup.getControllerServiceByType(requestControllerService.getType());
AuthorizeControllerServiceReference.authorizeControllerServiceReferences(requestControllerService.getProperties(), authorizable, authorizer, lookup);
}
});
}
if (validationPhase) {
return generateContinueResponse().build();
}
},
null,
(controllerServiceEntity) -> {
final ControllerServiceDTO controllerService = controllerServiceEntity.getComponent();
// set the processor id as appropriate
requestControllerService.setId(generateUuid());
controllerService.setId(generateUuid());
// create the controller service and generate the json
final Revision revision = getRevision(controllerServiceEntity, requestControllerService.getId());
final ControllerServiceEntity entity = serviceFacade.createControllerService(revision, null, requestControllerService);
final Revision revision = getRevision(controllerServiceEntity, controllerService.getId());
final ControllerServiceEntity entity = serviceFacade.createControllerService(revision, null, controllerService);
controllerServiceResource.populateRemainingControllerServiceEntityContent(entity);
// build the response
return clusterContext(generateCreatedResponse(URI.create(entity.getUri()), entity)).build();
}
);
}
// -------
// cluster
@ -666,26 +669,33 @@ public class ControllerResource extends ApplicationResource {
// Note: History requests are not replicated throughout the cluster and are instead handled by the nodes independently
// handle expects request (usually from the cluster manager)
final boolean validationPhase = isValidationPhase(httpServletRequest);
if (validationPhase || !isTwoPhaseRequest(httpServletRequest)) {
// authorize access
serviceFacade.authorizeAccess(lookup -> {
return withWriteLock(
serviceFacade,
new EndDateEntity(endDate.getDateTime()),
lookup -> {
authorizeController(RequestAction.WRITE);
});
}
if (validationPhase) {
return generateContinueResponse().build();
}
},
null,
(endDateEtity) -> {
// purge the actions
serviceFacade.deleteActions(endDate.getDateTime());
// create the response entity
final HistoryEntity entity = new HistoryEntity();
serviceFacade.deleteActions(endDateEtity.getEndDate());
// generate the response
return generateOkResponse(entity).build();
return generateOkResponse(new HistoryEntity()).build();
}
);
}
private class EndDateEntity extends Entity {
final Date endDate;
public EndDateEntity(Date endDate) {
this.endDate = endDate;
}
public Date getEndDate() {
return endDate;
}
}
// setters

View File

@ -346,21 +346,20 @@ public class ControllerServiceResource extends ApplicationResource {
return replicate(HttpMethod.POST);
}
final boolean validationPhase = isValidationPhase(httpServletRequest);
if (validationPhase || !isTwoPhaseRequest(httpServletRequest)) {
// authorize access
serviceFacade.authorizeAccess(lookup -> {
final ControllerServiceEntity requestControllerServiceEntity = new ControllerServiceEntity();
requestControllerServiceEntity.setId(id);
return withWriteLock(
serviceFacade,
requestControllerServiceEntity,
lookup -> {
final Authorizable controllerService = lookup.getControllerService(id).getAuthorizable();
controllerService.authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser());
});
}
if (validationPhase) {
serviceFacade.verifyCanClearControllerServiceState(id);
return generateContinueResponse().build();
}
},
() -> serviceFacade.verifyCanClearControllerServiceState(id),
(controllerServiceEntity) -> {
// get the component state
serviceFacade.clearControllerServiceState(id);
serviceFacade.clearControllerServiceState(controllerServiceEntity.getId());
// generate the response entity
final ComponentStateEntity entity = new ComponentStateEntity();
@ -368,6 +367,8 @@ public class ControllerServiceResource extends ApplicationResource {
// generate the response
return clusterContext(generateOkResponse(entity)).build();
}
);
}
/**
* Retrieves the references of the specified controller service.
@ -422,7 +423,7 @@ public class ControllerServiceResource extends ApplicationResource {
* Updates the references of the specified controller service.
*
* @param httpServletRequest request
* @param updateReferenceRequest The update request
* @param requestUpdateReferenceRequest The update request
* @return A controllerServiceReferencingComponentsEntity.
*/
@PUT
@ -455,13 +456,13 @@ public class ControllerServiceResource extends ApplicationResource {
@ApiParam(
value = "The controller service request update request.",
required = true
) final UpdateControllerServiceReferenceRequestEntity updateReferenceRequest) {
) final UpdateControllerServiceReferenceRequestEntity requestUpdateReferenceRequest) {
if (updateReferenceRequest.getId() == null) {
if (requestUpdateReferenceRequest.getId() == null) {
throw new IllegalArgumentException("The controller service identifier must be specified.");
}
if (updateReferenceRequest.getReferencingComponentRevisions() == null) {
if (requestUpdateReferenceRequest.getReferencingComponentRevisions() == null) {
throw new IllegalArgumentException("The controller service referencing components revisions must be specified.");
}
@ -471,14 +472,14 @@ public class ControllerServiceResource extends ApplicationResource {
// but not referencing schedulable components
ControllerServiceState requestControllerServiceState = null;
try {
requestControllerServiceState = ControllerServiceState.valueOf(updateReferenceRequest.getState());
requestControllerServiceState = ControllerServiceState.valueOf(requestUpdateReferenceRequest.getState());
} catch (final IllegalArgumentException iae) {
// ignore
}
ScheduledState requestScheduledState = null;
try {
requestScheduledState = ScheduledState.valueOf(updateReferenceRequest.getState());
requestScheduledState = ScheduledState.valueOf(requestUpdateReferenceRequest.getState());
} catch (final IllegalArgumentException iae) {
// ignore
}
@ -498,30 +499,51 @@ public class ControllerServiceResource extends ApplicationResource {
}
if (isReplicateRequest()) {
return replicate(HttpMethod.PUT, updateReferenceRequest);
return replicate(HttpMethod.PUT, requestUpdateReferenceRequest);
}
// convert the referencing revisions
final Map<String, Revision> requestReferencingRevisions = requestUpdateReferenceRequest.getReferencingComponentRevisions().entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getKey, e -> {
final RevisionDTO rev = e.getValue();
return new Revision(rev.getVersion(), rev.getClientId(), e.getKey());
}));
final Set<Revision> requestRevisions = new HashSet<>(requestReferencingRevisions.values());
final ScheduledState verifyScheduledState = requestScheduledState;
final ControllerServiceState verifyControllerServiceState = requestControllerServiceState;
return withWriteLock(
serviceFacade,
requestUpdateReferenceRequest,
requestRevisions,
lookup -> {
requestReferencingRevisions.entrySet().stream().forEach(e -> {
final Authorizable controllerService = lookup.getControllerServiceReferencingComponent(id, e.getKey());
controllerService.authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser());
});
},
() -> serviceFacade.verifyUpdateControllerServiceReferencingComponents(requestUpdateReferenceRequest.getId(), verifyScheduledState, verifyControllerServiceState),
(revisions, updateReferenceRequest) -> {
ScheduledState scheduledState = null;
try {
scheduledState = ScheduledState.valueOf(updateReferenceRequest.getState());
} catch (final IllegalArgumentException e) {
// ignore
}
ControllerServiceState controllerServiceState = null;
try {
controllerServiceState = ControllerServiceState.valueOf(updateReferenceRequest.getState());
} catch (final IllegalArgumentException iae) {
// ignore
}
final Map<String, Revision> referencingRevisions = updateReferenceRequest.getReferencingComponentRevisions().entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getKey, e -> {
final RevisionDTO rev = e.getValue();
return new Revision(rev.getVersion(), rev.getClientId(), e.getKey());
}));
final Set<Revision> revisions = new HashSet<>(referencingRevisions.values());
final ScheduledState scheduledState = requestScheduledState;
final ControllerServiceState controllerServiceState = requestControllerServiceState;
return withWriteLock(
serviceFacade,
revisions,
lookup -> {
referencingRevisions.entrySet().stream().forEach(e -> {
final Authorizable controllerService = lookup.getControllerServiceReferencingComponent(id, e.getKey());
controllerService.authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser());
});
},
() -> serviceFacade.verifyUpdateControllerServiceReferencingComponents(updateReferenceRequest.getId(), scheduledState, controllerServiceState),
() -> {
// update the controller service references
final ControllerServiceReferencingComponentsEntity entity = serviceFacade.updateControllerServiceReferencingComponents(
referencingRevisions, updateReferenceRequest.getId(), scheduledState, controllerServiceState);
@ -536,7 +558,7 @@ public class ControllerServiceResource extends ApplicationResource {
*
* @param httpServletRequest request
* @param id The id of the controller service to update.
* @param controllerServiceEntity A controllerServiceEntity.
* @param requestControllerServiceEntity A controllerServiceEntity.
* @return A controllerServiceEntity.
*/
@PUT
@ -570,32 +592,33 @@ public class ControllerServiceResource extends ApplicationResource {
@ApiParam(
value = "The controller service configuration details.",
required = true
) final ControllerServiceEntity controllerServiceEntity) {
) final ControllerServiceEntity requestControllerServiceEntity) {
if (controllerServiceEntity == null || controllerServiceEntity.getComponent() == null) {
if (requestControllerServiceEntity == null || requestControllerServiceEntity.getComponent() == null) {
throw new IllegalArgumentException("Controller service details must be specified.");
}
if (controllerServiceEntity.getRevision() == null) {
if (requestControllerServiceEntity.getRevision() == null) {
throw new IllegalArgumentException("Revision must be specified.");
}
// ensure the ids are the same
final ControllerServiceDTO requestControllerServiceDTO = controllerServiceEntity.getComponent();
final ControllerServiceDTO requestControllerServiceDTO = requestControllerServiceEntity.getComponent();
if (!id.equals(requestControllerServiceDTO.getId())) {
throw new IllegalArgumentException(String.format("The controller service id (%s) in the request body does not equal the "
+ "controller service id of the requested resource (%s).", requestControllerServiceDTO.getId(), id));
}
if (isReplicateRequest()) {
return replicate(HttpMethod.PUT, controllerServiceEntity);
return replicate(HttpMethod.PUT, requestControllerServiceEntity);
}
// handle expects request (usually from the cluster manager)
final Revision revision = getRevision(controllerServiceEntity, id);
final Revision requestRevision = getRevision(requestControllerServiceEntity, id);
return withWriteLock(
serviceFacade,
revision,
requestControllerServiceEntity,
requestRevision,
lookup -> {
// authorize the service
final ControllerServiceReferencingComponentAuthorizable authorizable = lookup.getControllerService(id);
@ -605,9 +628,11 @@ public class ControllerServiceResource extends ApplicationResource {
AuthorizeControllerServiceReference.authorizeControllerServiceReferences(requestControllerServiceDTO.getProperties(), authorizable, authorizer, lookup);
},
() -> serviceFacade.verifyUpdateControllerService(requestControllerServiceDTO),
() -> {
(revision, controllerServiceEntity) -> {
final ControllerServiceDTO controllerService = controllerServiceEntity.getComponent();
// update the controller service
final ControllerServiceEntity entity = serviceFacade.updateControllerService(revision, requestControllerServiceDTO);
final ControllerServiceEntity entity = serviceFacade.updateControllerService(revision, controllerService);
populateRemainingControllerServiceEntityContent(entity);
return clusterContext(generateOkResponse(entity)).build();
@ -669,19 +694,23 @@ public class ControllerServiceResource extends ApplicationResource {
return replicate(HttpMethod.DELETE);
}
final ControllerServiceEntity requestControllerServiceEntity = new ControllerServiceEntity();
requestControllerServiceEntity.setId(id);
// handle expects request (usually from the cluster manager)
final Revision revision = new Revision(version == null ? null : version.getLong(), clientId.getClientId(), id);
final Revision requestRevision = new Revision(version == null ? null : version.getLong(), clientId.getClientId(), id);
return withWriteLock(
serviceFacade,
revision,
requestControllerServiceEntity,
requestRevision,
lookup -> {
final Authorizable controllerService = lookup.getControllerService(id).getAuthorizable();
controllerService.authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser());
},
() -> serviceFacade.verifyDeleteControllerService(id),
() -> {
(revision, controllerServiceEntity) -> {
// delete the specified controller service
final ControllerServiceEntity entity = serviceFacade.deleteControllerService(revision, id);
final ControllerServiceEntity entity = serviceFacade.deleteControllerService(revision, controllerServiceEntity.getId());
return clusterContext(generateOkResponse(entity)).build();
}
);

View File

@ -40,6 +40,7 @@ import org.apache.nifi.cluster.protocol.NodeIdentifier;
import org.apache.nifi.web.NiFiServiceFacade;
import org.apache.nifi.web.api.dto.CounterDTO;
import org.apache.nifi.web.api.dto.CountersDTO;
import org.apache.nifi.web.api.entity.ComponentEntity;
import org.apache.nifi.web.api.entity.CounterEntity;
import org.apache.nifi.web.api.entity.CountersEntity;
@ -233,20 +234,19 @@ public class CountersResource extends ApplicationResource {
return replicate(HttpMethod.PUT);
}
// handle expects request (usually from the cluster manager)
final boolean validationPhase = isValidationPhase(httpServletRequest);
if (validationPhase || !isTwoPhaseRequest(httpServletRequest)) {
// authorize access
serviceFacade.authorizeAccess(lookup -> {
authorizeCounters(RequestAction.WRITE);
});
}
if (validationPhase) {
return generateContinueResponse().build();
}
final ComponentEntity requestComponentEntity = new ComponentEntity();
requestComponentEntity.setId(id);
return withWriteLock(
serviceFacade,
requestComponentEntity,
lookup -> {
authorizeCounters(RequestAction.WRITE);
},
null,
(componentEntity) -> {
// reset the specified counter
final CounterDTO counter = serviceFacade.updateCounter(id);
final CounterDTO counter = serviceFacade.updateCounter(requestComponentEntity.getId());
// create the response entity
final CounterEntity entity = new CounterEntity();
@ -255,6 +255,8 @@ public class CountersResource extends ApplicationResource {
// generate the response
return clusterContext(generateOkResponse(entity)).build();
}
);
}
// setters

View File

@ -37,7 +37,9 @@ import org.apache.nifi.web.api.dto.DropRequestDTO;
import org.apache.nifi.web.api.dto.FlowFileDTO;
import org.apache.nifi.web.api.dto.FlowFileSummaryDTO;
import org.apache.nifi.web.api.dto.ListingRequestDTO;
import org.apache.nifi.web.api.entity.ConnectionEntity;
import org.apache.nifi.web.api.entity.DropRequestEntity;
import org.apache.nifi.web.api.entity.Entity;
import org.apache.nifi.web.api.entity.FlowFileEntity;
import org.apache.nifi.web.api.entity.ListingRequestEntity;
import org.apache.nifi.web.api.request.ClientIdParameter;
@ -321,27 +323,25 @@ public class FlowFileQueueResource extends ApplicationResource {
return replicate(HttpMethod.POST);
}
// handle expects request (usually from the cluster manager)
final boolean validationPhase = isValidationPhase(httpServletRequest);
if (validationPhase || !isTwoPhaseRequest(httpServletRequest)) {
// authorize access
serviceFacade.authorizeAccess(lookup -> {
final ConnectionEntity requestConnectionEntity = new ConnectionEntity();
requestConnectionEntity.setId(id);
return withWriteLock(
serviceFacade,
requestConnectionEntity,
lookup -> {
final ConnectionAuthorizable connAuth = lookup.getConnection(id);
final Authorizable dataAuthorizable = lookup.getData(connAuth.getSource().getIdentifier());
dataAuthorizable.authorize(authorizer, RequestAction.READ, NiFiUserUtils.getNiFiUser());
});
}
if (validationPhase) {
serviceFacade.verifyListQueue(id);
return generateContinueResponse().build();
}
},
() -> serviceFacade.verifyListQueue(id),
(connectionEntity) -> {
// ensure the id is the same across the cluster
final String listingRequestId = generateUuid();
// submit the listing request
final ListingRequestDTO listingRequest = serviceFacade.createFlowFileListingRequest(id, listingRequestId);
populateRemainingFlowFileListingContent(id, listingRequest);
final ListingRequestDTO listingRequest = serviceFacade.createFlowFileListingRequest(connectionEntity.getId(), listingRequestId);
populateRemainingFlowFileListingContent(connectionEntity.getId(), listingRequest);
// create the response entity
final ListingRequestEntity entity = new ListingRequestEntity();
@ -351,6 +351,8 @@ public class FlowFileQueueResource extends ApplicationResource {
final URI location = URI.create(listingRequest.getUri());
return Response.status(Status.ACCEPTED).location(location).entity(entity).build();
}
);
}
/**
* Checks the status of an outstanding listing request.
@ -458,28 +460,24 @@ public class FlowFileQueueResource extends ApplicationResource {
return replicate(HttpMethod.DELETE);
}
// handle expects request (usually from the cluster manager)
final boolean validationPhase = isValidationPhase(httpServletRequest);
if (validationPhase || !isTwoPhaseRequest(httpServletRequest)) {
// authorize access
serviceFacade.authorizeAccess(lookup -> {
return withWriteLock(
serviceFacade,
new ListingEntity(connectionId, listingRequestId),
lookup -> {
final ConnectionAuthorizable connAuth = lookup.getConnection(connectionId);
final Authorizable dataAuthorizable = lookup.getData(connAuth.getSource().getIdentifier());
dataAuthorizable.authorize(authorizer, RequestAction.READ, NiFiUserUtils.getNiFiUser());
});
}
if (validationPhase) {
return generateContinueResponse().build();
}
},
null,
(listingEntity) -> {
// delete the listing request
final ListingRequestDTO listingRequest = serviceFacade.deleteFlowFileListingRequest(connectionId, listingRequestId);
final ListingRequestDTO listingRequest = serviceFacade.deleteFlowFileListingRequest(listingEntity.getConnectionId(), listingEntity.getListingRequestId());
// prune the results as they were already received when the listing completed
listingRequest.setFlowFileSummaries(null);
// populate remaining content
populateRemainingFlowFileListingContent(connectionId, listingRequest);
populateRemainingFlowFileListingContent(listingEntity.getConnectionId(), listingRequest);
// create the response entity
final ListingRequestEntity entity = new ListingRequestEntity();
@ -487,6 +485,26 @@ public class FlowFileQueueResource extends ApplicationResource {
return generateOkResponse(entity).build();
}
);
}
private static class ListingEntity extends Entity {
final String connectionId;
final String listingRequestId;
public ListingEntity(String connectionId, String listingRequestId) {
this.connectionId = connectionId;
this.listingRequestId = listingRequestId;
}
public String getConnectionId() {
return connectionId;
}
public String getListingRequestId() {
return listingRequestId;
}
}
/**
* Creates a request to delete the flowfiles in the queue of the specified connection.
@ -528,26 +546,25 @@ public class FlowFileQueueResource extends ApplicationResource {
return replicate(HttpMethod.POST);
}
// handle expects request (usually from the cluster manager)
final boolean validationPhase = isValidationPhase(httpServletRequest);
if (validationPhase || !isTwoPhaseRequest(httpServletRequest)) {
// authorize access
serviceFacade.authorizeAccess(lookup -> {
final ConnectionEntity requestConnectionEntity = new ConnectionEntity();
requestConnectionEntity.setId(id);
return withWriteLock(
serviceFacade,
requestConnectionEntity,
lookup -> {
final ConnectionAuthorizable connAuth = lookup.getConnection(id);
final Authorizable dataAuthorizable = lookup.getData(connAuth.getSource().getIdentifier());
dataAuthorizable.authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser());
});
}
if (validationPhase) {
return generateContinueResponse().build();
}
},
null,
(connectionEntity) -> {
// ensure the id is the same across the cluster
final String dropRequestId = generateUuid();
// submit the drop request
final DropRequestDTO dropRequest = serviceFacade.createFlowFileDropRequest(id, dropRequestId);
dropRequest.setUri(generateResourceUri("flowfile-queues", id, "drop-requests", dropRequest.getId()));
final DropRequestDTO dropRequest = serviceFacade.createFlowFileDropRequest(connectionEntity.getId(), dropRequestId);
dropRequest.setUri(generateResourceUri("flowfile-queues", connectionEntity.getId(), "drop-requests", dropRequest.getId()));
// create the response entity
final DropRequestEntity entity = new DropRequestEntity();
@ -557,6 +574,8 @@ public class FlowFileQueueResource extends ApplicationResource {
final URI location = URI.create(dropRequest.getUri());
return Response.status(Status.ACCEPTED).location(location).entity(entity).build();
}
);
}
/**
* Checks the status of an outstanding drop request.
@ -664,23 +683,19 @@ public class FlowFileQueueResource extends ApplicationResource {
return replicate(HttpMethod.DELETE);
}
// handle expects request (usually from the cluster manager)
final boolean validationPhase = isValidationPhase(httpServletRequest);
if (validationPhase || !isTwoPhaseRequest(httpServletRequest)) {
// authorize access
serviceFacade.authorizeAccess(lookup -> {
return withWriteLock(
serviceFacade,
new DropEntity(connectionId, dropRequestId),
lookup -> {
final ConnectionAuthorizable connAuth = lookup.getConnection(connectionId);
final Authorizable dataAuthorizable = lookup.getData(connAuth.getSource().getIdentifier());
dataAuthorizable.authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser());
});
}
if (validationPhase) {
return generateContinueResponse().build();
}
},
null,
(dropEntity) -> {
// delete the drop request
final DropRequestDTO dropRequest = serviceFacade.deleteFlowFileDropRequest(connectionId, dropRequestId);
dropRequest.setUri(generateResourceUri("flowfile-queues", connectionId, "drop-requests", dropRequestId));
final DropRequestDTO dropRequest = serviceFacade.deleteFlowFileDropRequest(dropEntity.getConnectionId(), dropEntity.getDropRequestId());
dropRequest.setUri(generateResourceUri("flowfile-queues", dropEntity.getConnectionId(), "drop-requests", dropEntity.getDropRequestId()));
// create the response entity
final DropRequestEntity entity = new DropRequestEntity();
@ -688,6 +703,26 @@ public class FlowFileQueueResource extends ApplicationResource {
return generateOkResponse(entity).build();
}
);
}
private static class DropEntity extends Entity {
final String connectionId;
final String dropRequestId;
public DropEntity(String connectionId, String dropRequestId) {
this.connectionId = connectionId;
this.dropRequestId = dropRequestId;
}
public String getConnectionId() {
return connectionId;
}
public String getDropRequestId() {
return dropRequestId;
}
}
// setters
public void setServiceFacade(final NiFiServiceFacade serviceFacade) {

View File

@ -527,7 +527,7 @@ public class FlowResource extends ApplicationResource {
*
* @param httpServletRequest request
* @param id The id of the process group.
* @param scheduleComponentsEntity A scheduleComponentsEntity.
* @param requestScheduleComponentsEntity A scheduleComponentsEntity.
* @return A processGroupEntity.
*/
@PUT
@ -559,20 +559,23 @@ public class FlowResource extends ApplicationResource {
required = true
)
@PathParam("id") String id,
ScheduleComponentsEntity scheduleComponentsEntity) {
@ApiParam(
value = "The request to schedule or unschedule. If the comopnents in the request are not specified, all authorized components will be considered.",
required = true
) final ScheduleComponentsEntity requestScheduleComponentsEntity) {
// ensure the same id is being used
if (!id.equals(scheduleComponentsEntity.getId())) {
if (!id.equals(requestScheduleComponentsEntity.getId())) {
throw new IllegalArgumentException(String.format("The process group id (%s) in the request body does "
+ "not equal the process group id of the requested resource (%s).", scheduleComponentsEntity.getId(), id));
+ "not equal the process group id of the requested resource (%s).", requestScheduleComponentsEntity.getId(), id));
}
final ScheduledState state;
if (scheduleComponentsEntity.getState() == null) {
if (requestScheduleComponentsEntity.getState() == null) {
throw new IllegalArgumentException("The scheduled state must be specified.");
} else {
try {
state = ScheduledState.valueOf(scheduleComponentsEntity.getState());
state = ScheduledState.valueOf(requestScheduleComponentsEntity.getState());
} catch (final IllegalArgumentException iae) {
throw new IllegalArgumentException(String.format("The scheduled must be one of [%s].", StringUtils.join(EnumSet.of(ScheduledState.RUNNING, ScheduledState.STOPPED), ", ")));
}
@ -584,7 +587,7 @@ public class FlowResource extends ApplicationResource {
}
// if the components are not specified, gather all components and their current revision
if (scheduleComponentsEntity.getComponents() == null) {
if (requestScheduleComponentsEntity.getComponents() == null) {
// get the current revisions for the components being updated
final Set<Revision> revisions = serviceFacade.getRevisionsFromGroup(id, group -> {
final Set<String> componentIds = new HashSet<>();
@ -626,34 +629,42 @@ public class FlowResource extends ApplicationResource {
});
// set the components and their current revision
scheduleComponentsEntity.setComponents(componentsToSchedule);
requestScheduleComponentsEntity.setComponents(componentsToSchedule);
}
if (isReplicateRequest()) {
return replicate(HttpMethod.PUT, scheduleComponentsEntity);
return replicate(HttpMethod.PUT, requestScheduleComponentsEntity);
}
final Map<String, RevisionDTO> componentsToSchedule = scheduleComponentsEntity.getComponents();
final Map<String, Revision> componentRevisions = componentsToSchedule.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> getRevision(e.getValue(), e.getKey())));
final Set<Revision> revisions = new HashSet<>(componentRevisions.values());
final Map<String, RevisionDTO> requestComponentsToSchedule = requestScheduleComponentsEntity.getComponents();
final Map<String, Revision> requestComponentRevisions =
requestComponentsToSchedule.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> getRevision(e.getValue(), e.getKey())));
final Set<Revision> requestRevisions = new HashSet<>(requestComponentRevisions.values());
return withWriteLock(
serviceFacade,
revisions,
requestScheduleComponentsEntity,
requestRevisions,
lookup -> {
// ensure access to the flow
authorizeFlow();
// ensure access to every component being scheduled
componentsToSchedule.keySet().forEach(componentId -> {
requestComponentsToSchedule.keySet().forEach(componentId -> {
final Authorizable connectable = lookup.getConnectable(componentId);
connectable.authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser());
});
},
() -> serviceFacade.verifyScheduleComponents(id, state, componentRevisions.keySet()),
() -> {
() -> serviceFacade.verifyScheduleComponents(id, state, requestComponentRevisions.keySet()),
(revisions, scheduleComponentsEntity) -> {
final ScheduledState scheduledState = ScheduledState.valueOf(scheduleComponentsEntity.getState());
final Map<String, RevisionDTO> componentsToSchedule = scheduleComponentsEntity.getComponents();
final Map<String, Revision> componentRevisions =
componentsToSchedule.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> getRevision(e.getValue(), e.getKey())));
// update the process group
final ScheduleComponentsEntity entity = serviceFacade.scheduleComponents(id, state, componentRevisions);
final ScheduleComponentsEntity entity = serviceFacade.scheduleComponents(id, scheduledState, componentRevisions);
return clusterContext(generateOkResponse(entity)).build();
}
);

View File

@ -142,7 +142,7 @@ public class FunnelResource extends ApplicationResource {
*
* @param httpServletRequest request
* @param id The id of the funnel to update.
* @param funnelEntity A funnelEntity.
* @param requestFunnelEntity A funnelEntity.
* @return A funnelEntity.
*/
@PUT
@ -175,40 +175,41 @@ public class FunnelResource extends ApplicationResource {
@ApiParam(
value = "The funnel configuration details.",
required = true
) final FunnelEntity funnelEntity) {
) final FunnelEntity requestFunnelEntity) {
if (funnelEntity == null || funnelEntity.getComponent() == null) {
if (requestFunnelEntity == null || requestFunnelEntity.getComponent() == null) {
throw new IllegalArgumentException("Funnel details must be specified.");
}
if (funnelEntity.getRevision() == null) {
if (requestFunnelEntity.getRevision() == null) {
throw new IllegalArgumentException("Revision must be specified.");
}
// ensure the ids are the same
final FunnelDTO requestFunnelDTO = funnelEntity.getComponent();
final FunnelDTO requestFunnelDTO = requestFunnelEntity.getComponent();
if (!id.equals(requestFunnelDTO.getId())) {
throw new IllegalArgumentException(String.format("The funnel id (%s) in the request body does not equal the "
+ "funnel id of the requested resource (%s).", requestFunnelDTO.getId(), id));
}
if (isReplicateRequest()) {
return replicate(HttpMethod.PUT, funnelEntity);
return replicate(HttpMethod.PUT, requestFunnelEntity);
}
// Extract the revision
final Revision revision = getRevision(funnelEntity, id);
final Revision requestRevision = getRevision(requestFunnelEntity, id);
return withWriteLock(
serviceFacade,
revision,
requestFunnelEntity,
requestRevision,
lookup -> {
Authorizable authorizable = lookup.getFunnel(id);
authorizable.authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser());
},
null,
() -> {
(revision, funnelEntity) -> {
// update the funnel
final FunnelEntity entity = serviceFacade.updateFunnel(revision, requestFunnelDTO);
final FunnelEntity entity = serviceFacade.updateFunnel(revision, funnelEntity.getComponent());
populateRemainingFunnelEntityContent(entity);
return clusterContext(generateOkResponse(entity)).build();
@ -270,19 +271,23 @@ public class FunnelResource extends ApplicationResource {
return replicate(HttpMethod.DELETE);
}
final FunnelEntity requestFunnelEntity = new FunnelEntity();
requestFunnelEntity.setId(id);
// handle expects request (usually from the cluster manager)
final Revision revision = new Revision(version == null ? null : version.getLong(), clientId.getClientId(), id);
final Revision requestRevision = new Revision(version == null ? null : version.getLong(), clientId.getClientId(), id);
return withWriteLock(
serviceFacade,
revision,
requestFunnelEntity,
requestRevision,
lookup -> {
final Authorizable funnel = lookup.getFunnel(id);
funnel.authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser());
},
() -> serviceFacade.verifyDeleteFunnel(id),
() -> {
(revision, funnelEntity) -> {
// delete the specified funnel
final FunnelEntity entity = serviceFacade.deleteFunnel(revision, id);
final FunnelEntity entity = serviceFacade.deleteFunnel(revision, funnelEntity.getId());
return clusterContext(generateOkResponse(entity)).build();
}
);

View File

@ -142,7 +142,7 @@ public class InputPortResource extends ApplicationResource {
*
* @param httpServletRequest request
* @param id The id of the input port to update.
* @param portEntity A inputPortEntity.
* @param requestPortEntity A inputPortEntity.
* @return A inputPortEntity.
*/
@PUT
@ -175,40 +175,43 @@ public class InputPortResource extends ApplicationResource {
@ApiParam(
value = "The input port configuration details.",
required = true
) final PortEntity portEntity) {
) final PortEntity requestPortEntity) {
if (portEntity == null || portEntity.getComponent() == null) {
if (requestPortEntity == null || requestPortEntity.getComponent() == null) {
throw new IllegalArgumentException("Input port details must be specified.");
}
if (portEntity.getRevision() == null) {
if (requestPortEntity.getRevision() == null) {
throw new IllegalArgumentException("Revision must be specified.");
}
// ensure the ids are the same
final PortDTO requestPortDTO = portEntity.getComponent();
final PortDTO requestPortDTO = requestPortEntity.getComponent();
if (!id.equals(requestPortDTO.getId())) {
throw new IllegalArgumentException(String.format("The input port id (%s) in the request body does not equal the "
+ "input port id of the requested resource (%s).", requestPortDTO.getId(), id));
}
if (isReplicateRequest()) {
return replicate(HttpMethod.PUT, portEntity);
return replicate(HttpMethod.PUT, requestPortEntity);
}
// handle expects request (usually from the cluster manager)
final Revision revision = getRevision(portEntity, id);
final Revision requestRevision = getRevision(requestPortEntity, id);
return withWriteLock(
serviceFacade,
revision,
requestPortEntity,
requestRevision,
lookup -> {
Authorizable authorizable = lookup.getInputPort(id);
authorizable.authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser());
},
() -> serviceFacade.verifyUpdateInputPort(requestPortDTO),
() -> {
(revision, portEntity) -> {
final PortDTO portDTO = portEntity.getComponent();
// update the input port
final PortEntity entity = serviceFacade.updateInputPort(revision, requestPortDTO);
final PortEntity entity = serviceFacade.updateInputPort(revision, portDTO);
populateRemainingInputPortEntityContent(entity);
return clusterContext(generateOkResponse(entity)).build();
@ -267,19 +270,23 @@ public class InputPortResource extends ApplicationResource {
return replicate(HttpMethod.DELETE);
}
final PortEntity requestPortEntity = new PortEntity();
requestPortEntity.setId(id);
// handle expects request (usually from the cluster manager)
final Revision revision = new Revision(version == null ? null : version.getLong(), clientId.getClientId(), id);
final Revision requestRevision = new Revision(version == null ? null : version.getLong(), clientId.getClientId(), id);
return withWriteLock(
serviceFacade,
revision,
requestPortEntity,
requestRevision,
lookup -> {
final Authorizable inputPort = lookup.getInputPort(id);
inputPort.authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser());
},
() -> serviceFacade.verifyDeleteInputPort(id),
() -> {
(revision, portEntity) -> {
// delete the specified input port
final PortEntity entity = serviceFacade.deleteInputPort(revision, id);
final PortEntity entity = serviceFacade.deleteInputPort(revision, portEntity.getId());
return clusterContext(generateOkResponse(entity)).build();
}
);

View File

@ -142,7 +142,7 @@ public class LabelResource extends ApplicationResource {
*
* @param httpServletRequest request
* @param id The id of the label to update.
* @param labelEntity A labelEntity.
* @param requestLabelEntity A labelEntity.
* @return A labelEntity.
*/
@PUT
@ -175,40 +175,43 @@ public class LabelResource extends ApplicationResource {
@ApiParam(
value = "The label configuraiton details.",
required = true
) final LabelEntity labelEntity) {
) final LabelEntity requestLabelEntity) {
if (labelEntity == null || labelEntity.getComponent() == null) {
if (requestLabelEntity == null || requestLabelEntity.getComponent() == null) {
throw new IllegalArgumentException("Label details must be specified.");
}
if (labelEntity.getRevision() == null) {
if (requestLabelEntity.getRevision() == null) {
throw new IllegalArgumentException("Revision must be specified.");
}
// ensure the ids are the same
final LabelDTO requestLabelDTO = labelEntity.getComponent();
final LabelDTO requestLabelDTO = requestLabelEntity.getComponent();
if (!id.equals(requestLabelDTO.getId())) {
throw new IllegalArgumentException(String.format("The label id (%s) in the request body does not equal the "
+ "label id of the requested resource (%s).", requestLabelDTO.getId(), id));
}
if (isReplicateRequest()) {
return replicate(HttpMethod.PUT, labelEntity);
return replicate(HttpMethod.PUT, requestLabelEntity);
}
// handle expects request (usually from the cluster manager)
final Revision revision = getRevision(labelEntity, id);
final Revision requestRevision = getRevision(requestLabelEntity, id);
return withWriteLock(
serviceFacade,
revision,
requestLabelEntity,
requestRevision,
lookup -> {
Authorizable authorizable = lookup.getLabel(id);
authorizable.authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser());
},
null,
() -> {
(revision, labelEntity) -> {
final LabelDTO labelDTO = labelEntity.getComponent();
// update the label
final LabelEntity entity = serviceFacade.updateLabel(revision, requestLabelDTO);
final LabelEntity entity = serviceFacade.updateLabel(revision, labelDTO);
populateRemainingLabelEntityContent(entity);
return clusterContext(generateOkResponse(entity)).build();
@ -267,19 +270,23 @@ public class LabelResource extends ApplicationResource {
return replicate(HttpMethod.DELETE);
}
final LabelEntity requestLabelEntity = new LabelEntity();
requestLabelEntity.setId(id);
// handle expects request (usually from the cluster manager)
final Revision revision = new Revision(version == null ? null : version.getLong(), clientId.getClientId(), id);
final Revision requestRevision = new Revision(version == null ? null : version.getLong(), clientId.getClientId(), id);
return withWriteLock(
serviceFacade,
revision,
requestLabelEntity,
requestRevision,
lookup -> {
final Authorizable label = lookup.getLabel(id);
label.authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser());
},
null,
() -> {
(revision, labelEntity) -> {
// delete the specified label
final LabelEntity entity = serviceFacade.deleteLabel(revision, id);
final LabelEntity entity = serviceFacade.deleteLabel(revision, labelEntity.getId());
return clusterContext(generateOkResponse(entity)).build();
}
);

View File

@ -142,7 +142,7 @@ public class OutputPortResource extends ApplicationResource {
*
* @param httpServletRequest request
* @param id The id of the output port to update.
* @param portEntity A outputPortEntity.
* @param requestPortEntity A outputPortEntity.
* @return A outputPortEntity.
*/
@PUT
@ -175,40 +175,43 @@ public class OutputPortResource extends ApplicationResource {
@ApiParam(
value = "The output port configuration details.",
required = true
) final PortEntity portEntity) {
) final PortEntity requestPortEntity) {
if (portEntity == null || portEntity.getComponent() == null) {
if (requestPortEntity == null || requestPortEntity.getComponent() == null) {
throw new IllegalArgumentException("Output port details must be specified.");
}
if (portEntity.getRevision() == null) {
if (requestPortEntity.getRevision() == null) {
throw new IllegalArgumentException("Revision must be specified.");
}
// ensure the ids are the same
PortDTO requestPortDTO = portEntity.getComponent();
PortDTO requestPortDTO = requestPortEntity.getComponent();
if (!id.equals(requestPortDTO.getId())) {
throw new IllegalArgumentException(String.format("The output port id (%s) in the request body does not equal the "
+ "output port id of the requested resource (%s).", requestPortDTO.getId(), id));
}
if (isReplicateRequest()) {
return replicate(HttpMethod.PUT, portEntity);
return replicate(HttpMethod.PUT, requestPortEntity);
}
// handle expects request (usually from the cluster manager)
final Revision revision = getRevision(portEntity, id);
final Revision requestRevision = getRevision(requestPortEntity, id);
return withWriteLock(
serviceFacade,
revision,
requestPortEntity,
requestRevision,
lookup -> {
Authorizable authorizable = lookup.getOutputPort(id);
authorizable.authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser());
},
() -> serviceFacade.verifyUpdateOutputPort(requestPortDTO),
() -> {
(revision, portEntity) -> {
final PortDTO portDTO = portEntity.getComponent();
// update the output port
final PortEntity entity = serviceFacade.updateOutputPort(revision, requestPortDTO);
final PortEntity entity = serviceFacade.updateOutputPort(revision, portDTO);
populateRemainingOutputPortEntityContent(entity);
return clusterContext(generateOkResponse(entity)).build();
@ -267,19 +270,23 @@ public class OutputPortResource extends ApplicationResource {
return replicate(HttpMethod.DELETE);
}
final PortEntity requestPortEntity = new PortEntity();
requestPortEntity.setId(id);
// handle expects request (usually from the cluster manager)
final Revision revision = new Revision(version == null ? null : version.getLong(), clientId.getClientId(), id);
final Revision requestRevision = new Revision(version == null ? null : version.getLong(), clientId.getClientId(), id);
return withWriteLock(
serviceFacade,
revision,
requestPortEntity,
requestRevision,
lookup -> {
final Authorizable outputPort = lookup.getOutputPort(id);
outputPort.authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser());
},
() -> serviceFacade.verifyDeleteOutputPort(id),
() -> {
(revision, portEntity) -> {
// delete the specified output port
final PortEntity entity = serviceFacade.deleteOutputPort(revision, id);
final PortEntity entity = serviceFacade.deleteOutputPort(revision, portEntity.getId());
return clusterContext(generateOkResponse(entity)).build();
}
);

View File

@ -354,21 +354,20 @@ public class ProcessorResource extends ApplicationResource {
return replicate(HttpMethod.POST);
}
final boolean isValidationPhase = isValidationPhase(httpServletRequest);
if (isValidationPhase || !isTwoPhaseRequest(httpServletRequest)) {
// authorize access
serviceFacade.authorizeAccess(lookup -> {
final ProcessorEntity requestProcessorEntity = new ProcessorEntity();
requestProcessorEntity.setId(id);
return withWriteLock(
serviceFacade,
requestProcessorEntity,
lookup -> {
final Authorizable processor = lookup.getProcessor(id).getAuthorizable();
processor.authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser());
});
}
if (isValidationPhase) {
serviceFacade.verifyCanClearProcessorState(id);
return generateContinueResponse().build();
}
},
() -> serviceFacade.verifyCanClearProcessorState(id),
(processorEntity) -> {
// get the component state
serviceFacade.clearProcessorState(id);
serviceFacade.clearProcessorState(processorEntity.getId());
// generate the response entity
final ComponentStateEntity entity = new ComponentStateEntity();
@ -376,13 +375,15 @@ public class ProcessorResource extends ApplicationResource {
// generate the response
return clusterContext(generateOkResponse(entity)).build();
}
);
}
/**
* Updates the specified processor with the specified values.
*
* @param httpServletRequest request
* @param id The id of the processor to update.
* @param processorEntity A processorEntity.
* @param requestProcessorEntity A processorEntity.
* @return A processorEntity.
* @throws InterruptedException if interrupted
*/
@ -417,32 +418,33 @@ public class ProcessorResource extends ApplicationResource {
@ApiParam(
value = "The processor configuration details.",
required = true
) final ProcessorEntity processorEntity) throws InterruptedException {
) final ProcessorEntity requestProcessorEntity) throws InterruptedException {
if (processorEntity == null || processorEntity.getComponent() == null) {
if (requestProcessorEntity == null || requestProcessorEntity.getComponent() == null) {
throw new IllegalArgumentException("Processor details must be specified.");
}
if (processorEntity.getRevision() == null) {
if (requestProcessorEntity.getRevision() == null) {
throw new IllegalArgumentException("Revision must be specified.");
}
// ensure the same id is being used
final ProcessorDTO requestProcessorDTO = processorEntity.getComponent();
final ProcessorDTO requestProcessorDTO = requestProcessorEntity.getComponent();
if (!id.equals(requestProcessorDTO.getId())) {
throw new IllegalArgumentException(String.format("The processor id (%s) in the request body does "
+ "not equal the processor id of the requested resource (%s).", requestProcessorDTO.getId(), id));
}
if (isReplicateRequest()) {
return replicate(HttpMethod.PUT, processorEntity);
return replicate(HttpMethod.PUT, requestProcessorEntity);
}
// handle expects request (usually from the cluster manager)
final Revision revision = getRevision(processorEntity, id);
final Revision requestRevision = getRevision(requestProcessorEntity, id);
return withWriteLock(
serviceFacade,
revision,
requestProcessorEntity,
requestRevision,
lookup -> {
final NiFiUser user = NiFiUserUtils.getNiFiUser();
@ -455,9 +457,11 @@ public class ProcessorResource extends ApplicationResource {
}
},
() -> serviceFacade.verifyUpdateProcessor(requestProcessorDTO),
() -> {
(revision, processorEntity) -> {
final ProcessorDTO processorDTO = processorEntity.getComponent();
// update the processor
final ProcessorEntity entity = serviceFacade.updateProcessor(revision, requestProcessorDTO);
final ProcessorEntity entity = serviceFacade.updateProcessor(revision, processorDTO);
populateRemainingProcessorEntityContent(entity);
return clusterContext(generateOkResponse(entity)).build();
@ -517,18 +521,22 @@ public class ProcessorResource extends ApplicationResource {
return replicate(HttpMethod.DELETE);
}
final Revision revision = new Revision(version == null ? null : version.getLong(), clientId.getClientId(), id);
final ProcessorEntity requestProcessorEntity = new ProcessorEntity();
requestProcessorEntity.setId(id);
final Revision requestRevision = new Revision(version == null ? null : version.getLong(), clientId.getClientId(), id);
return withWriteLock(
serviceFacade,
revision,
requestProcessorEntity,
requestRevision,
lookup -> {
final Authorizable processor = lookup.getProcessor(id).getAuthorizable();
processor.authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser());
},
() -> serviceFacade.verifyDeleteProcessor(id),
() -> {
(revision, processorEntity) -> {
// delete the processor
final ProcessorEntity entity = serviceFacade.deleteProcessor(revision, id);
final ProcessorEntity entity = serviceFacade.deleteProcessor(revision, processorEntity.getId());
// generate the response
return clusterContext(generateOkResponse(entity)).build();

View File

@ -33,12 +33,12 @@ import org.apache.nifi.authorization.UserContextKeys;
import org.apache.nifi.authorization.resource.ResourceFactory;
import org.apache.nifi.authorization.user.NiFiUser;
import org.apache.nifi.authorization.user.NiFiUserUtils;
import org.apache.nifi.cluster.coordination.http.replication.RequestReplicator;
import org.apache.nifi.web.NiFiServiceFacade;
import org.apache.nifi.web.api.dto.provenance.ProvenanceDTO;
import org.apache.nifi.web.api.dto.provenance.ProvenanceOptionsDTO;
import org.apache.nifi.web.api.dto.provenance.lineage.LineageDTO;
import org.apache.nifi.web.api.dto.provenance.lineage.LineageRequestDTO;
import org.apache.nifi.web.api.entity.ComponentEntity;
import org.apache.nifi.web.api.entity.LineageEntity;
import org.apache.nifi.web.api.entity.ProvenanceEntity;
import org.apache.nifi.web.api.entity.ProvenanceOptionsEntity;
@ -164,7 +164,7 @@ public class ProvenanceResource extends ApplicationResource {
* Creates provenance using the specified query criteria.
*
* @param httpServletRequest request
* @param provenanceEntity A provenanceEntity
* @param requestProvenanceEntity A provenanceEntity
* @return A provenanceEntity
*/
@POST
@ -196,20 +196,20 @@ public class ProvenanceResource extends ApplicationResource {
@ApiParam(
value = "The provenance query details.",
required = true
) ProvenanceEntity provenanceEntity) {
authorizeProvenanceRequest();
) ProvenanceEntity requestProvenanceEntity) {
// check the request
if (provenanceEntity == null) {
provenanceEntity = new ProvenanceEntity();
if (requestProvenanceEntity == null) {
requestProvenanceEntity = new ProvenanceEntity();
}
// get the provenance
ProvenanceDTO provenanceDto = provenanceEntity.getProvenance();
if (provenanceDto == null) {
provenanceDto = new ProvenanceDTO();
provenanceEntity.setProvenance(provenanceDto);
final ProvenanceDTO requestProvenanceDto;
if (requestProvenanceEntity.getProvenance() != null) {
requestProvenanceDto = requestProvenanceEntity.getProvenance();
} else {
requestProvenanceDto = new ProvenanceDTO();
requestProvenanceEntity.setProvenance(requestProvenanceDto);
}
// replicate if cluster manager
@ -219,33 +219,35 @@ public class ProvenanceResource extends ApplicationResource {
headersToOverride.put("content-type", MediaType.APPLICATION_JSON);
// determine where this request should be sent
if (provenanceDto.getRequest() == null || provenanceDto.getRequest().getClusterNodeId() == null) {
if (requestProvenanceDto.getRequest() == null || requestProvenanceDto.getRequest().getClusterNodeId() == null) {
// replicate to all nodes
return replicate(HttpMethod.POST, provenanceEntity, headersToOverride);
return replicate(HttpMethod.POST, requestProvenanceEntity, headersToOverride);
} else {
return replicate(HttpMethod.POST, provenanceEntity, provenanceDto.getRequest().getClusterNodeId(), headersToOverride);
return replicate(HttpMethod.POST, requestProvenanceEntity, requestProvenanceDto.getRequest().getClusterNodeId(), headersToOverride);
}
}
// handle expects request (usually from the cluster manager)
final String expects = httpServletRequest.getHeader(RequestReplicator.REQUEST_VALIDATION_HTTP_HEADER);
if (expects != null) {
return generateContinueResponse().build();
}
return withWriteLock(
serviceFacade,
requestProvenanceEntity,
lookup -> authorizeProvenanceRequest(),
null,
(provenanceEntity) -> {
final ProvenanceDTO provenanceDTO = provenanceEntity.getProvenance();
// ensure the id is the same across the cluster
final String provenanceId = generateUuid();
// set the provenance id accordingly
provenanceDto.setId(provenanceId);
provenanceDTO.setId(provenanceId);
// submit the provenance request
final ProvenanceDTO dto = serviceFacade.submitProvenance(provenanceDto);
final ProvenanceDTO dto = serviceFacade.submitProvenance(provenanceDTO);
populateRemainingProvenanceContent(dto);
// set the cluster id if necessary
if (provenanceDto.getRequest() != null && provenanceDto.getRequest().getClusterNodeId() != null) {
dto.getRequest().setClusterNodeId(provenanceDto.getRequest().getClusterNodeId());
if (provenanceDTO.getRequest() != null && provenanceDTO.getRequest().getClusterNodeId() != null) {
dto.getRequest().setClusterNodeId(provenanceDTO.getRequest().getClusterNodeId());
}
// create the response entity
@ -255,6 +257,8 @@ public class ProvenanceResource extends ApplicationResource {
// generate the response
return clusterContext(generateCreatedResponse(URI.create(dto.getUri()), entity)).build();
}
);
}
/**
* Gets the provenance with the specified id.
@ -363,8 +367,6 @@ public class ProvenanceResource extends ApplicationResource {
)
@PathParam("id") final String id) {
authorizeProvenanceRequest();
// replicate if cluster manager
if (isReplicateRequest()) {
// determine where this request should be sent
@ -376,20 +378,22 @@ public class ProvenanceResource extends ApplicationResource {
}
}
// handle expects request (usually from the cluster manager)
final String expects = httpServletRequest.getHeader(RequestReplicator.REQUEST_VALIDATION_HTTP_HEADER);
if (expects != null) {
return generateContinueResponse().build();
}
final ComponentEntity requestEntity = new ComponentEntity();
requestEntity.setId(id);
return withWriteLock(
serviceFacade,
requestEntity,
lookup -> authorizeProvenanceRequest(),
null,
(entity) -> {
// delete the provenance
serviceFacade.deleteProvenance(id);
// create the response entity
final ProvenanceEntity entity = new ProvenanceEntity();
serviceFacade.deleteProvenance(entity.getId());
// generate the response
return clusterContext(generateOkResponse(entity)).build();
return clusterContext(generateOkResponse(new ProvenanceEntity())).build();
}
);
}
/**
@ -401,7 +405,7 @@ public class ProvenanceResource extends ApplicationResource {
* When querying for the lineage of a flowfile you must specify the uuid. The eventId and eventDirection cannot be specified in this case.
*
* @param httpServletRequest request
* @param lineageEntity A lineageEntity
* @param requestLineageEntity A lineageEntity
* @return A lineageEntity
*/
@POST
@ -434,17 +438,15 @@ public class ProvenanceResource extends ApplicationResource {
@ApiParam(
value = "The lineage query details.",
required = true
) final LineageEntity lineageEntity) {
) final LineageEntity requestLineageEntity) {
authorizeProvenanceRequest();
if (lineageEntity == null || lineageEntity.getLineage() == null || lineageEntity.getLineage().getRequest() == null) {
if (requestLineageEntity == null || requestLineageEntity.getLineage() == null || requestLineageEntity.getLineage().getRequest() == null) {
throw new IllegalArgumentException("Lineage request must be specified.");
}
// ensure the request is well formed
final LineageDTO lineageDto = lineageEntity.getLineage();
final LineageRequestDTO requestDto = lineageDto.getRequest();
final LineageDTO requestLineageDto = requestLineageEntity.getLineage();
final LineageRequestDTO requestDto = requestLineageDto.getRequest();
// ensure the type has been specified
if (requestDto.getLineageRequestType() == null) {
@ -477,18 +479,20 @@ public class ProvenanceResource extends ApplicationResource {
// change content type to JSON for serializing entity
final Map<String, String> headersToOverride = new HashMap<>();
headersToOverride.put("content-type", MediaType.APPLICATION_JSON);
return replicate(HttpMethod.POST, lineageEntity, requestDto.getClusterNodeId(), headersToOverride);
return replicate(HttpMethod.POST, requestLineageEntity, requestDto.getClusterNodeId(), headersToOverride);
}
// handle expects request (usually from the cluster manager)
final String expects = httpServletRequest.getHeader(RequestReplicator.REQUEST_VALIDATION_HTTP_HEADER);
if (expects != null) {
return generateContinueResponse().build();
}
return withWriteLock(
serviceFacade,
requestLineageEntity,
lookup -> authorizeProvenanceRequest(),
null,
(lineageEntity) -> {
final LineageDTO lineageDTO = lineageEntity.getLineage();
// get the provenance event
final LineageDTO dto = serviceFacade.submitLineage(lineageDto);
dto.getRequest().setClusterNodeId(requestDto.getClusterNodeId());
final LineageDTO dto = serviceFacade.submitLineage(lineageDTO);
dto.getRequest().setClusterNodeId(lineageDTO.getRequest().getClusterNodeId());
populateRemainingLineageContent(dto);
// create a response entity
@ -498,6 +502,8 @@ public class ProvenanceResource extends ApplicationResource {
// generate the response
return clusterContext(generateOkResponse(entity)).build();
}
);
}
/**
* Gets the lineage with the specified id.
@ -600,27 +606,27 @@ public class ProvenanceResource extends ApplicationResource {
)
@PathParam("id") final String id) {
authorizeProvenanceRequest();
// replicate if cluster manager
if (isReplicateRequest()) {
return replicate(HttpMethod.DELETE, clusterNodeId);
}
// handle expects request (usually from the cluster manager)
final String expects = httpServletRequest.getHeader(RequestReplicator.REQUEST_VALIDATION_HTTP_HEADER);
if (expects != null) {
return generateContinueResponse().build();
}
final ComponentEntity requestEntity = new ComponentEntity();
requestEntity.setId(id);
return withWriteLock(
serviceFacade,
requestEntity,
lookup -> authorizeProvenanceRequest(),
null,
(entity) -> {
// delete the lineage
serviceFacade.deleteLineage(id);
// create the response entity
final LineageEntity entity = new LineageEntity();
serviceFacade.deleteLineage(entity.getId());
// generate the response
return clusterContext(generateOkResponse(entity)).build();
return clusterContext(generateOkResponse(new LineageEntity())).build();
}
);
}
// setters

View File

@ -192,18 +192,22 @@ public class RemoteProcessGroupResource extends ApplicationResource {
return replicate(HttpMethod.DELETE);
}
final RemoteProcessGroupEntity requestRemoteProcessGroupEntity = new RemoteProcessGroupEntity();
requestRemoteProcessGroupEntity.setId(id);
// handle expects request (usually from the cluster manager)
final Revision revision = new Revision(version == null ? null : version.getLong(), clientId.getClientId(), id);
final Revision requestRevision = new Revision(version == null ? null : version.getLong(), clientId.getClientId(), id);
return withWriteLock(
serviceFacade,
revision,
requestRemoteProcessGroupEntity,
requestRevision,
lookup -> {
final Authorizable remoteProcessGroup = lookup.getRemoteProcessGroup(id);
remoteProcessGroup.authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser());
},
() -> serviceFacade.verifyDeleteRemoteProcessGroup(id),
() -> {
final RemoteProcessGroupEntity entity = serviceFacade.deleteRemoteProcessGroup(revision, id);
(revision, remoteProcessGroupEntity) -> {
final RemoteProcessGroupEntity entity = serviceFacade.deleteRemoteProcessGroup(revision, remoteProcessGroupEntity.getId());
return clusterContext(generateOkResponse(entity)).build();
}
);
@ -215,7 +219,7 @@ public class RemoteProcessGroupResource extends ApplicationResource {
* @param httpServletRequest request
* @param id The id of the remote process group to update.
* @param portId The id of the input port to update.
* @param remoteProcessGroupPortEntity The remoteProcessGroupPortEntity
* @param requestRemoteProcessGroupPortEntity The remoteProcessGroupPortEntity
* @return A remoteProcessGroupPortEntity
*/
@PUT
@ -241,20 +245,31 @@ public class RemoteProcessGroupResource extends ApplicationResource {
)
public Response updateRemoteProcessGroupInputPort(
@Context final HttpServletRequest httpServletRequest,
@ApiParam(
value = "The remote process group id.",
required = true
)
@PathParam("id") final String id,
@ApiParam(
value = "The remote process group port id.",
required = true
)
@PathParam("port-id") final String portId,
final RemoteProcessGroupPortEntity remoteProcessGroupPortEntity) {
@ApiParam(
value = "The remote process group port.",
required = true
) final RemoteProcessGroupPortEntity requestRemoteProcessGroupPortEntity) {
if (remoteProcessGroupPortEntity == null || remoteProcessGroupPortEntity.getRemoteProcessGroupPort() == null) {
if (requestRemoteProcessGroupPortEntity == null || requestRemoteProcessGroupPortEntity.getRemoteProcessGroupPort() == null) {
throw new IllegalArgumentException("Remote process group port details must be specified.");
}
if (remoteProcessGroupPortEntity.getRevision() == null) {
if (requestRemoteProcessGroupPortEntity.getRevision() == null) {
throw new IllegalArgumentException("Revision must be specified.");
}
// ensure the ids are the same
final RemoteProcessGroupPortDTO requestRemoteProcessGroupPort = remoteProcessGroupPortEntity.getRemoteProcessGroupPort();
final RemoteProcessGroupPortDTO requestRemoteProcessGroupPort = requestRemoteProcessGroupPortEntity.getRemoteProcessGroupPort();
if (!portId.equals(requestRemoteProcessGroupPort.getId())) {
throw new IllegalArgumentException(String.format("The remote process group port id (%s) in the request body does not equal the "
+ "remote process group port id of the requested resource (%s).", requestRemoteProcessGroupPort.getId(), portId));
@ -267,21 +282,24 @@ public class RemoteProcessGroupResource extends ApplicationResource {
}
if (isReplicateRequest()) {
return replicate(HttpMethod.PUT, remoteProcessGroupPortEntity);
return replicate(HttpMethod.PUT, requestRemoteProcessGroupPortEntity);
}
final Revision revision = getRevision(remoteProcessGroupPortEntity, id);
final Revision requestRevision = getRevision(requestRemoteProcessGroupPortEntity, id);
return withWriteLock(
serviceFacade,
revision,
requestRemoteProcessGroupPortEntity,
requestRevision,
lookup -> {
final Authorizable remoteProcessGroupInputPort = lookup.getRemoteProcessGroupInputPort(id, portId);
remoteProcessGroupInputPort.authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser());
},
() -> serviceFacade.verifyUpdateRemoteProcessGroupInputPort(id, requestRemoteProcessGroupPort),
() -> {
(revision, remoteProcessGroupPortEntity) -> {
final RemoteProcessGroupPortDTO remoteProcessGroupPort = remoteProcessGroupPortEntity.getRemoteProcessGroupPort();
// update the specified remote process group
final RemoteProcessGroupPortEntity controllerResponse = serviceFacade.updateRemoteProcessGroupInputPort(revision, id, requestRemoteProcessGroupPort);
final RemoteProcessGroupPortEntity controllerResponse = serviceFacade.updateRemoteProcessGroupInputPort(revision, remoteProcessGroupPort.getId(), remoteProcessGroupPort);
// get the updated revision
final RevisionDTO updatedRevision = controllerResponse.getRevision();
@ -302,7 +320,7 @@ public class RemoteProcessGroupResource extends ApplicationResource {
* @param httpServletRequest request
* @param id The id of the remote process group to update.
* @param portId The id of the output port to update.
* @param remoteProcessGroupPortEntity The remoteProcessGroupPortEntity
* @param requestRemoteProcessGroupPortEntity The remoteProcessGroupPortEntity
* @return A remoteProcessGroupPortEntity
*/
@PUT
@ -328,20 +346,31 @@ public class RemoteProcessGroupResource extends ApplicationResource {
)
public Response updateRemoteProcessGroupOutputPort(
@Context HttpServletRequest httpServletRequest,
@ApiParam(
value = "The remote process group id.",
required = true
)
@PathParam("id") String id,
@ApiParam(
value = "The remote process group port id.",
required = true
)
@PathParam("port-id") String portId,
RemoteProcessGroupPortEntity remoteProcessGroupPortEntity) {
@ApiParam(
value = "The remote process group port.",
required = true
) RemoteProcessGroupPortEntity requestRemoteProcessGroupPortEntity) {
if (remoteProcessGroupPortEntity == null || remoteProcessGroupPortEntity.getRemoteProcessGroupPort() == null) {
if (requestRemoteProcessGroupPortEntity == null || requestRemoteProcessGroupPortEntity.getRemoteProcessGroupPort() == null) {
throw new IllegalArgumentException("Remote process group port details must be specified.");
}
if (remoteProcessGroupPortEntity.getRevision() == null) {
if (requestRemoteProcessGroupPortEntity.getRevision() == null) {
throw new IllegalArgumentException("Revision must be specified.");
}
// ensure the ids are the same
final RemoteProcessGroupPortDTO requestRemoteProcessGroupPort = remoteProcessGroupPortEntity.getRemoteProcessGroupPort();
final RemoteProcessGroupPortDTO requestRemoteProcessGroupPort = requestRemoteProcessGroupPortEntity.getRemoteProcessGroupPort();
if (!portId.equals(requestRemoteProcessGroupPort.getId())) {
throw new IllegalArgumentException(String.format("The remote process group port id (%s) in the request body does not equal the "
+ "remote process group port id of the requested resource (%s).", requestRemoteProcessGroupPort.getId(), portId));
@ -354,22 +383,25 @@ public class RemoteProcessGroupResource extends ApplicationResource {
}
if (isReplicateRequest()) {
return replicate(HttpMethod.PUT, remoteProcessGroupPortEntity);
return replicate(HttpMethod.PUT, requestRemoteProcessGroupPortEntity);
}
// handle expects request (usually from the cluster manager)
final Revision revision = getRevision(remoteProcessGroupPortEntity, id);
final Revision requestRevision = getRevision(requestRemoteProcessGroupPortEntity, id);
return withWriteLock(
serviceFacade,
revision,
requestRemoteProcessGroupPortEntity,
requestRevision,
lookup -> {
final Authorizable remoteProcessGroupOutputPort = lookup.getRemoteProcessGroupOutputPort(id, portId);
remoteProcessGroupOutputPort.authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser());
},
() -> serviceFacade.verifyUpdateRemoteProcessGroupOutputPort(id, requestRemoteProcessGroupPort),
() -> {
(revision, remoteProcessGroupPortEntity) -> {
final RemoteProcessGroupPortDTO remoteProcessGroupPort = remoteProcessGroupPortEntity.getRemoteProcessGroupPort();
// update the specified remote process group
final RemoteProcessGroupPortEntity controllerResponse = serviceFacade.updateRemoteProcessGroupOutputPort(revision, id, requestRemoteProcessGroupPort);
final RemoteProcessGroupPortEntity controllerResponse = serviceFacade.updateRemoteProcessGroupOutputPort(revision, remoteProcessGroupPort.getId(), remoteProcessGroupPort);
// get the updated revision
final RevisionDTO updatedRevision = controllerResponse.getRevision();
@ -389,7 +421,7 @@ public class RemoteProcessGroupResource extends ApplicationResource {
*
* @param httpServletRequest request
* @param id The id of the remote process group to update.
* @param remoteProcessGroupEntity A remoteProcessGroupEntity.
* @param requestRemoteProcessGroupEntity A remoteProcessGroupEntity.
* @return A remoteProcessGroupEntity.
*/
@PUT
@ -414,59 +446,69 @@ public class RemoteProcessGroupResource extends ApplicationResource {
)
public Response updateRemoteProcessGroup(
@Context HttpServletRequest httpServletRequest,
@ApiParam(
value = "The remote process group id.",
required = true
)
@PathParam("id") String id,
RemoteProcessGroupEntity remoteProcessGroupEntity) {
@ApiParam(
value = "The remote process group.",
required = true
) final RemoteProcessGroupEntity requestRemoteProcessGroupEntity) {
if (remoteProcessGroupEntity == null || remoteProcessGroupEntity.getComponent() == null) {
if (requestRemoteProcessGroupEntity == null || requestRemoteProcessGroupEntity.getComponent() == null) {
throw new IllegalArgumentException("Remote process group details must be specified.");
}
if (remoteProcessGroupEntity.getRevision() == null) {
if (requestRemoteProcessGroupEntity.getRevision() == null) {
throw new IllegalArgumentException("Revision must be specified.");
}
// ensure the ids are the same
final RemoteProcessGroupDTO requestRemoteProcessGroup = remoteProcessGroupEntity.getComponent();
final RemoteProcessGroupDTO requestRemoteProcessGroup = requestRemoteProcessGroupEntity.getComponent();
if (!id.equals(requestRemoteProcessGroup.getId())) {
throw new IllegalArgumentException(String.format("The remote process group id (%s) in the request body does not equal the "
+ "remote process group id of the requested resource (%s).", requestRemoteProcessGroup.getId(), id));
}
if (isReplicateRequest()) {
return replicate(HttpMethod.PUT, remoteProcessGroupEntity);
return replicate(HttpMethod.PUT, requestRemoteProcessGroupEntity);
}
// handle expects request (usually from the cluster manager)
final Revision revision = getRevision(remoteProcessGroupEntity, id);
final Revision requestRevision = getRevision(requestRemoteProcessGroupEntity, id);
return withWriteLock(
serviceFacade,
revision,
requestRemoteProcessGroupEntity,
requestRevision,
lookup -> {
Authorizable authorizable = lookup.getRemoteProcessGroup(id);
authorizable.authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser());
},
() -> serviceFacade.verifyUpdateRemoteProcessGroup(requestRemoteProcessGroup),
() -> {
(revision, remoteProcessGroupEntity) -> {
final RemoteProcessGroupDTO remoteProcessGroup = remoteProcessGroupEntity.getComponent();
// if the target uri is set we have to verify it here - we don't support updating the target uri on
// an existing remote process group, however if the remote process group is being created with an id
// as is the case in clustered mode we need to verify the remote process group. treat this request as
// though its a new remote process group.
if (requestRemoteProcessGroup.getTargetUri() != null) {
if (remoteProcessGroup.getTargetUri() != null) {
// parse the uri
final URI uri;
try {
uri = URI.create(requestRemoteProcessGroup.getTargetUri());
uri = URI.create(remoteProcessGroup.getTargetUri());
} catch (final IllegalArgumentException e) {
throw new IllegalArgumentException("The specified remote process group URL is malformed: " + requestRemoteProcessGroup.getTargetUri());
throw new IllegalArgumentException("The specified remote process group URL is malformed: " + remoteProcessGroup.getTargetUri());
}
// validate each part of the uri
if (uri.getScheme() == null || uri.getHost() == null) {
throw new IllegalArgumentException("The specified remote process group URL is malformed: " + requestRemoteProcessGroup.getTargetUri());
throw new IllegalArgumentException("The specified remote process group URL is malformed: " + remoteProcessGroup.getTargetUri());
}
if (!(uri.getScheme().equalsIgnoreCase("http") || uri.getScheme().equalsIgnoreCase("https"))) {
throw new IllegalArgumentException("The specified remote process group URL is invalid because it is not http or https: " + requestRemoteProcessGroup.getTargetUri());
throw new IllegalArgumentException("The specified remote process group URL is invalid because it is not http or https: " + remoteProcessGroup.getTargetUri());
}
// normalize the uri to the other controller
@ -476,11 +518,11 @@ public class RemoteProcessGroupResource extends ApplicationResource {
}
// update with the normalized uri
requestRemoteProcessGroup.setTargetUri(controllerUri);
remoteProcessGroup.setTargetUri(controllerUri);
}
// update the specified remote process group
final RemoteProcessGroupEntity entity = serviceFacade.updateRemoteProcessGroup(revision, requestRemoteProcessGroup);
final RemoteProcessGroupEntity entity = serviceFacade.updateRemoteProcessGroup(revision, remoteProcessGroup);
populateRemainingRemoteProcessGroupEntityContent(entity);
return clusterContext(generateOkResponse(entity)).build();

View File

@ -333,21 +333,20 @@ public class ReportingTaskResource extends ApplicationResource {
return replicate(HttpMethod.POST);
}
final boolean isValidationPhase = isValidationPhase(httpServletRequest);
if (isValidationPhase || !isTwoPhaseRequest(httpServletRequest)) {
// authorize access
serviceFacade.authorizeAccess(lookup -> {
final ReportingTaskEntity requestReportTaskEntity = new ReportingTaskEntity();
requestReportTaskEntity.setId(id);
return withWriteLock(
serviceFacade,
requestReportTaskEntity,
lookup -> {
final Authorizable processor = lookup.getReportingTask(id).getAuthorizable();
processor.authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser());
});
}
if (isValidationPhase) {
serviceFacade.verifyCanClearReportingTaskState(id);
return generateContinueResponse().build();
}
},
() -> serviceFacade.verifyCanClearReportingTaskState(id),
(reportingTaskEntity) -> {
// get the component state
serviceFacade.clearReportingTaskState(id);
serviceFacade.clearReportingTaskState(reportingTaskEntity.getId());
// generate the response entity
final ComponentStateEntity entity = new ComponentStateEntity();
@ -355,13 +354,15 @@ public class ReportingTaskResource extends ApplicationResource {
// generate the response
return clusterContext(generateOkResponse(entity)).build();
}
);
}
/**
* Updates the specified a Reporting Task.
*
* @param httpServletRequest request
* @param id The id of the reporting task to update.
* @param reportingTaskEntity A reportingTaskEntity.
* @param requestReportingTaskEntity A reportingTaskEntity.
* @return A reportingTaskEntity.
*/
@PUT
@ -395,32 +396,33 @@ public class ReportingTaskResource extends ApplicationResource {
@ApiParam(
value = "The reporting task configuration details.",
required = true
) final ReportingTaskEntity reportingTaskEntity) {
) final ReportingTaskEntity requestReportingTaskEntity) {
if (reportingTaskEntity == null || reportingTaskEntity.getComponent() == null) {
if (requestReportingTaskEntity == null || requestReportingTaskEntity.getComponent() == null) {
throw new IllegalArgumentException("Reporting task details must be specified.");
}
if (reportingTaskEntity.getRevision() == null) {
if (requestReportingTaskEntity.getRevision() == null) {
throw new IllegalArgumentException("Revision must be specified.");
}
// ensure the ids are the same
final ReportingTaskDTO requestReportingTaskDTO = reportingTaskEntity.getComponent();
final ReportingTaskDTO requestReportingTaskDTO = requestReportingTaskEntity.getComponent();
if (!id.equals(requestReportingTaskDTO.getId())) {
throw new IllegalArgumentException(String.format("The reporting task id (%s) in the request body does not equal the "
+ "reporting task id of the requested resource (%s).", requestReportingTaskDTO.getId(), id));
}
if (isReplicateRequest()) {
return replicate(HttpMethod.PUT, reportingTaskEntity);
return replicate(HttpMethod.PUT, requestReportingTaskEntity);
}
// handle expects request (usually from the cluster manager)
final Revision revision = getRevision(reportingTaskEntity, id);
final Revision requestRevision = getRevision(requestReportingTaskEntity, id);
return withWriteLock(
serviceFacade,
revision,
requestReportingTaskEntity,
requestRevision,
lookup -> {
// authorize reporting task
final ControllerServiceReferencingComponentAuthorizable authorizable = lookup.getReportingTask(id);
@ -430,9 +432,11 @@ public class ReportingTaskResource extends ApplicationResource {
AuthorizeControllerServiceReference.authorizeControllerServiceReferences(requestReportingTaskDTO.getProperties(), authorizable, authorizer, lookup);
},
() -> serviceFacade.verifyUpdateReportingTask(requestReportingTaskDTO),
() -> {
(revision, reportingTaskEntity) -> {
final ReportingTaskDTO reportingTaskDTO = reportingTaskEntity.getComponent();
// update the reporting task
final ReportingTaskEntity entity = serviceFacade.updateReportingTask(revision, requestReportingTaskDTO);
final ReportingTaskEntity entity = serviceFacade.updateReportingTask(revision, reportingTaskDTO);
populateRemainingReportingTaskEntityContent(entity);
return clusterContext(generateOkResponse(entity)).build();
@ -494,19 +498,23 @@ public class ReportingTaskResource extends ApplicationResource {
return replicate(HttpMethod.DELETE);
}
final ReportingTaskEntity requestReportingTaskEntity = new ReportingTaskEntity();
requestReportingTaskEntity.setId(id);
// handle expects request (usually from the cluster manager)
final Revision revision = new Revision(version == null ? null : version.getLong(), clientId.getClientId(), id);
final Revision requestRevision = new Revision(version == null ? null : version.getLong(), clientId.getClientId(), id);
return withWriteLock(
serviceFacade,
revision,
requestReportingTaskEntity,
requestRevision,
lookup -> {
final Authorizable reportingTask = lookup.getReportingTask(id).getAuthorizable();
reportingTask.authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser());
},
() -> serviceFacade.verifyDeleteReportingTask(id),
() -> {
(revision, reportingTaskEntity) -> {
// delete the specified reporting task
final ReportingTaskEntity entity = serviceFacade.deleteReportingTask(revision, id);
final ReportingTaskEntity entity = serviceFacade.deleteReportingTask(revision, reportingTaskEntity.getId());
return clusterContext(generateOkResponse(entity)).build();
}
);

View File

@ -30,6 +30,7 @@ import org.apache.nifi.controller.Snippet;
import org.apache.nifi.web.NiFiServiceFacade;
import org.apache.nifi.web.Revision;
import org.apache.nifi.web.api.dto.SnippetDTO;
import org.apache.nifi.web.api.entity.ComponentEntity;
import org.apache.nifi.web.api.entity.SnippetEntity;
import javax.servlet.http.HttpServletRequest;
@ -94,7 +95,7 @@ public class SnippetResource extends ApplicationResource {
* Creates a snippet based off the specified configuration.
*
* @param httpServletRequest request
* @param snippetEntity A snippetEntity
* @param requestSnippetEntity A snippetEntity
* @return A snippetEntity
*/
@POST
@ -122,26 +123,25 @@ public class SnippetResource extends ApplicationResource {
value = "The snippet configuration details.",
required = true
)
final SnippetEntity snippetEntity) {
final SnippetEntity requestSnippetEntity) {
if (snippetEntity == null || snippetEntity.getSnippet() == null) {
if (requestSnippetEntity == null || requestSnippetEntity.getSnippet() == null) {
throw new IllegalArgumentException("Snippet details must be specified.");
}
if (snippetEntity.getSnippet().getId() != null) {
if (requestSnippetEntity.getSnippet().getId() != null) {
throw new IllegalArgumentException("Snippet ID cannot be specified.");
}
if (isReplicateRequest()) {
return replicate(HttpMethod.POST, snippetEntity);
return replicate(HttpMethod.POST, requestSnippetEntity);
}
// handle expects request (usually from the cluster manager)
final boolean validationPhase = isValidationPhase(httpServletRequest);
if (validationPhase || !isTwoPhaseRequest(httpServletRequest)) {
// authorize access
serviceFacade.authorizeAccess(lookup -> {
final SnippetDTO snippet = snippetEntity.getSnippet();
return withWriteLock(
serviceFacade,
requestSnippetEntity,
lookup -> {
final SnippetDTO snippet = requestSnippetEntity.getSnippet();
// the snippet being created may be used later for batch component modifications,
// copy/paste, or template creation. during those subsequent actions, the snippet
@ -154,12 +154,9 @@ public class SnippetResource extends ApplicationResource {
} catch (final AccessDeniedException e) {
authorizeSnippet(snippet, authorizer, lookup, RequestAction.WRITE);
}
});
}
if (validationPhase) {
return generateContinueResponse().build();
}
},
null,
(snippetEntity) -> {
// set the processor id as appropriate
snippetEntity.getSnippet().setId(generateUuid());
@ -170,13 +167,15 @@ public class SnippetResource extends ApplicationResource {
// build the response
return clusterContext(generateCreatedResponse(URI.create(entity.getSnippet().getUri()), entity)).build();
}
);
}
/**
* Move's the components in this Snippet into a new Process Group.
*
* @param httpServletRequest request
* @param snippetId The id of the snippet.
* @param snippetEntity A snippetEntity
* @param requestSnippetEntity A snippetEntity
* @return A snippetEntity
*/
@PUT
@ -210,28 +209,29 @@ public class SnippetResource extends ApplicationResource {
@ApiParam(
value = "The snippet configuration details.",
required = true
) final SnippetEntity snippetEntity) {
) final SnippetEntity requestSnippetEntity) {
if (snippetEntity == null || snippetEntity.getSnippet() == null) {
if (requestSnippetEntity == null || requestSnippetEntity.getSnippet() == null) {
throw new IllegalArgumentException("Snippet details must be specified.");
}
// ensure the ids are the same
final SnippetDTO requestSnippetDTO = snippetEntity.getSnippet();
final SnippetDTO requestSnippetDTO = requestSnippetEntity.getSnippet();
if (!snippetId.equals(requestSnippetDTO.getId())) {
throw new IllegalArgumentException(String.format("The snippet id (%s) in the request body does not equal the "
+ "snippet id of the requested resource (%s).", requestSnippetDTO.getId(), snippetId));
}
if (isReplicateRequest()) {
return replicate(HttpMethod.PUT, snippetEntity);
return replicate(HttpMethod.PUT, requestSnippetEntity);
}
// get the revision from this snippet
final Set<Revision> revisions = serviceFacade.getRevisionsFromSnippet(snippetId);
final Set<Revision> requestRevisions = serviceFacade.getRevisionsFromSnippet(snippetId);
return withWriteLock(
serviceFacade,
revisions,
requestSnippetEntity,
requestRevisions,
lookup -> {
// ensure write access to the target process group
if (requestSnippetDTO.getParentGroupId() != null) {
@ -242,8 +242,8 @@ public class SnippetResource extends ApplicationResource {
final Snippet snippet = lookup.getSnippet(snippetId);
authorizeSnippet(snippet, authorizer, lookup, RequestAction.WRITE);
},
() -> serviceFacade.verifyUpdateSnippet(requestSnippetDTO, revisions.stream().map(rev -> rev.getComponentId()).collect(Collectors.toSet())),
() -> {
() -> serviceFacade.verifyUpdateSnippet(requestSnippetDTO, requestRevisions.stream().map(rev -> rev.getComponentId()).collect(Collectors.toSet())),
(revisions, snippetEntity) -> {
// update the snippet
final SnippetEntity entity = serviceFacade.updateSnippet(revisions, snippetEntity.getSnippet());
populateRemainingSnippetEntityContent(entity);
@ -291,20 +291,24 @@ public class SnippetResource extends ApplicationResource {
return replicate(HttpMethod.DELETE);
}
final ComponentEntity requestEntity = new ComponentEntity();
requestEntity.setId(snippetId);
// get the revision from this snippet
final Set<Revision> revisions = serviceFacade.getRevisionsFromSnippet(snippetId);
final Set<Revision> requestRevisions = serviceFacade.getRevisionsFromSnippet(snippetId);
return withWriteLock(
serviceFacade,
revisions,
requestEntity,
requestRevisions,
lookup -> {
// ensure read permission to every component in the snippet
final Snippet snippet = lookup.getSnippet(snippetId);
authorizeSnippet(snippet, authorizer, lookup, RequestAction.WRITE);
},
() -> serviceFacade.verifyDeleteSnippet(snippetId, revisions.stream().map(rev -> rev.getComponentId()).collect(Collectors.toSet())),
() -> {
() -> serviceFacade.verifyDeleteSnippet(snippetId, requestRevisions.stream().map(rev -> rev.getComponentId()).collect(Collectors.toSet())),
(revisions, entity) -> {
// delete the specified snippet
final SnippetEntity snippetEntity = serviceFacade.deleteSnippet(revisions, snippetId);
final SnippetEntity snippetEntity = serviceFacade.deleteSnippet(revisions, entity.getId());
return clusterContext(generateOkResponse(snippetEntity)).build();
}
);

View File

@ -189,25 +189,28 @@ public class TemplateResource extends ApplicationResource {
return replicate(HttpMethod.DELETE);
}
// handle expects request (usually from the cluster manager)
final boolean validationPhase = isValidationPhase(httpServletRequest);
if (validationPhase) {
// authorize access
serviceFacade.authorizeAccess(lookup -> {
final TemplateEntity requestTemplateEntity = new TemplateEntity();
requestTemplateEntity.setId(id);
return withWriteLock(
serviceFacade,
requestTemplateEntity,
lookup -> {
final Authorizable template = lookup.getTemplate(id);
template.authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser());
});
return generateContinueResponse().build();
}
},
null,
(templateEntity) -> {
// delete the specified template
serviceFacade.deleteTemplate(id);
serviceFacade.deleteTemplate(templateEntity.getId());
// build the response entity
final TemplateEntity entity = new TemplateEntity();
return clusterContext(generateOkResponse(entity)).build();
}
);
}
// setters

View File

@ -115,7 +115,7 @@ public class TenantsResource extends ApplicationResource {
* Creates a new user.
*
* @param httpServletRequest request
* @param userEntity An userEntity.
* @param requestUserEntity An userEntity.
* @return An userEntity.
*/
@POST
@ -144,48 +144,44 @@ public class TenantsResource extends ApplicationResource {
@ApiParam(
value = "The user configuration details.",
required = true
) final UserEntity userEntity) {
) final UserEntity requestUserEntity) {
// ensure we're running with a configurable authorizer
if (!(authorizer instanceof AbstractPolicyBasedAuthorizer)) {
throw new IllegalStateException(AccessPolicyDAO.MSG_NON_ABSTRACT_POLICY_BASED_AUTHORIZER);
}
if (userEntity == null || userEntity.getComponent() == null) {
if (requestUserEntity == null || requestUserEntity.getComponent() == null) {
throw new IllegalArgumentException("User details must be specified.");
}
if (userEntity.getRevision() == null || (userEntity.getRevision().getVersion() == null || userEntity.getRevision().getVersion() != 0)) {
if (requestUserEntity.getRevision() == null || (requestUserEntity.getRevision().getVersion() == null || requestUserEntity.getRevision().getVersion() != 0)) {
throw new IllegalArgumentException("A revision of 0 must be specified when creating a new User.");
}
if (userEntity.getComponent().getId() != null) {
if (requestUserEntity.getComponent().getId() != null) {
throw new IllegalArgumentException("User ID cannot be specified.");
}
if (isReplicateRequest()) {
return replicate(HttpMethod.POST, userEntity);
return replicate(HttpMethod.POST, requestUserEntity);
}
// handle expects request (usually from the cluster manager)
final boolean validationPhase = isValidationPhase(httpServletRequest);
if (validationPhase || !isTwoPhaseRequest(httpServletRequest)) {
// authorize access
serviceFacade.authorizeAccess(lookup -> {
final Authorizable tenants = lookup.getTenant();
tenants.authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser());
});
}
if (validationPhase) {
return generateContinueResponse().build();
}
// set the user id as appropriate
userEntity.getComponent().setId(generateUuid());
// get revision from the config
final RevisionDTO revisionDTO = userEntity.getRevision();
Revision revision = new Revision(revisionDTO.getVersion(), revisionDTO.getClientId(), userEntity.getComponent().getId());
final RevisionDTO revisionDTO = requestUserEntity.getRevision();
Revision requestRevision = new Revision(revisionDTO.getVersion(), revisionDTO.getClientId(), requestUserEntity.getComponent().getId());
return withWriteLock(
serviceFacade,
requestUserEntity,
requestRevision,
lookup -> {
final Authorizable tenants = lookup.getTenant();
tenants.authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser());
},
null,
(revision, userEntity) -> {
// set the user id as appropriate
userEntity.getComponent().setId(generateUuid());
// create the user and generate the json
final UserEntity entity = serviceFacade.createUser(revision, userEntity.getComponent());
@ -194,6 +190,8 @@ public class TenantsResource extends ApplicationResource {
// build the response
return clusterContext(generateCreatedResponse(URI.create(entity.getUri()), entity)).build();
}
);
}
/**
* Retrieves the specified user.
@ -311,7 +309,7 @@ public class TenantsResource extends ApplicationResource {
*
* @param httpServletRequest request
* @param id The id of the user to update.
* @param userEntity An userEntity.
* @param requestUserEntity An userEntity.
* @return An userEntity.
*/
@PUT
@ -345,45 +343,46 @@ public class TenantsResource extends ApplicationResource {
@ApiParam(
value = "The user configuration details.",
required = true
) final UserEntity userEntity) {
) final UserEntity requestUserEntity) {
// ensure we're running with a configurable authorizer
if (!(authorizer instanceof AbstractPolicyBasedAuthorizer)) {
throw new IllegalStateException(AccessPolicyDAO.MSG_NON_ABSTRACT_POLICY_BASED_AUTHORIZER);
}
if (userEntity == null || userEntity.getComponent() == null) {
if (requestUserEntity == null || requestUserEntity.getComponent() == null) {
throw new IllegalArgumentException("User details must be specified.");
}
if (userEntity.getRevision() == null) {
if (requestUserEntity.getRevision() == null) {
throw new IllegalArgumentException("Revision must be specified.");
}
// ensure the ids are the same
final UserDTO userDTO = userEntity.getComponent();
if (!id.equals(userDTO.getId())) {
final UserDTO requestUserDTO = requestUserEntity.getComponent();
if (!id.equals(requestUserDTO.getId())) {
throw new IllegalArgumentException(String.format("The user id (%s) in the request body does not equal the "
+ "user id of the requested resource (%s).", userDTO.getId(), id));
+ "user id of the requested resource (%s).", requestUserDTO.getId(), id));
}
if (isReplicateRequest()) {
return replicate(HttpMethod.PUT, userEntity);
return replicate(HttpMethod.PUT, requestUserEntity);
}
// Extract the revision
final Revision revision = getRevision(userEntity, id);
final Revision requestRevision = getRevision(requestUserEntity, id);
return withWriteLock(
serviceFacade,
revision,
requestUserEntity,
requestRevision,
lookup -> {
final Authorizable tenants = lookup.getTenant();
tenants.authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser());
},
null,
() -> {
(revision, userEntity) -> {
// update the user
final UserEntity entity = serviceFacade.updateUser(revision, userDTO);
final UserEntity entity = serviceFacade.updateUser(revision, userEntity.getComponent());
populateRemainingUserEntityContent(entity);
return clusterContext(generateOkResponse(entity)).build();
@ -451,19 +450,23 @@ public class TenantsResource extends ApplicationResource {
return replicate(HttpMethod.DELETE);
}
final UserEntity requestUserEntity = new UserEntity();
requestUserEntity.setId(id);
// handle expects request (usually from the cluster manager)
final Revision revision = new Revision(version == null ? null : version.getLong(), clientId.getClientId(), id);
final Revision requestRevision = new Revision(version == null ? null : version.getLong(), clientId.getClientId(), id);
return withWriteLock(
serviceFacade,
revision,
requestUserEntity,
requestRevision,
lookup -> {
final Authorizable tenants = lookup.getTenant();
tenants.authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser());
},
null,
() -> {
(revision, userEntity) -> {
// delete the specified user
final UserEntity entity = serviceFacade.deleteUser(revision, id);
final UserEntity entity = serviceFacade.deleteUser(revision, userEntity.getId());
return clusterContext(generateOkResponse(entity)).build();
}
);
@ -497,7 +500,7 @@ public class TenantsResource extends ApplicationResource {
* Creates a new user group.
*
* @param httpServletRequest request
* @param userGroupEntity An userGroupEntity.
* @param requestUserGroupEntity An userGroupEntity.
* @return An userGroupEntity.
*/
@POST
@ -526,48 +529,44 @@ public class TenantsResource extends ApplicationResource {
@ApiParam(
value = "The user group configuration details.",
required = true
) final UserGroupEntity userGroupEntity) {
) final UserGroupEntity requestUserGroupEntity) {
// ensure we're running with a configurable authorizer
if (!(authorizer instanceof AbstractPolicyBasedAuthorizer)) {
throw new IllegalStateException(AccessPolicyDAO.MSG_NON_ABSTRACT_POLICY_BASED_AUTHORIZER);
}
if (userGroupEntity == null || userGroupEntity.getComponent() == null) {
if (requestUserGroupEntity == null || requestUserGroupEntity.getComponent() == null) {
throw new IllegalArgumentException("User group details must be specified.");
}
if (userGroupEntity.getRevision() == null || (userGroupEntity.getRevision().getVersion() == null || userGroupEntity.getRevision().getVersion() != 0)) {
if (requestUserGroupEntity.getRevision() == null || (requestUserGroupEntity.getRevision().getVersion() == null || requestUserGroupEntity.getRevision().getVersion() != 0)) {
throw new IllegalArgumentException("A revision of 0 must be specified when creating a new User Group.");
}
if (userGroupEntity.getComponent().getId() != null) {
if (requestUserGroupEntity.getComponent().getId() != null) {
throw new IllegalArgumentException("User group ID cannot be specified.");
}
if (isReplicateRequest()) {
return replicate(HttpMethod.POST, userGroupEntity);
return replicate(HttpMethod.POST, requestUserGroupEntity);
}
// handle expects request (usually from the cluster manager)
final boolean validationPhase = isValidationPhase(httpServletRequest);
if (validationPhase || !isTwoPhaseRequest(httpServletRequest)) {
// authorize access
serviceFacade.authorizeAccess(lookup -> {
final Authorizable tenants = lookup.getTenant();
tenants.authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser());
});
}
if (validationPhase) {
return generateContinueResponse().build();
}
// set the user group id as appropriate
userGroupEntity.getComponent().setId(generateUuid());
// get revision from the config
final RevisionDTO revisionDTO = userGroupEntity.getRevision();
Revision revision = new Revision(revisionDTO.getVersion(), revisionDTO.getClientId(), userGroupEntity.getComponent().getId());
final RevisionDTO revisionDTO = requestUserGroupEntity.getRevision();
Revision requestRevision = new Revision(revisionDTO.getVersion(), revisionDTO.getClientId(), requestUserGroupEntity.getComponent().getId());
return withWriteLock(
serviceFacade,
requestUserGroupEntity,
requestRevision,
lookup -> {
final Authorizable tenants = lookup.getTenant();
tenants.authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser());
},
null,
(revision, userGroupEntity) -> {
// set the user group id as appropriate
userGroupEntity.getComponent().setId(generateUuid());
// create the user group and generate the json
final UserGroupEntity entity = serviceFacade.createUserGroup(revision, userGroupEntity.getComponent());
@ -576,6 +575,8 @@ public class TenantsResource extends ApplicationResource {
// build the response
return clusterContext(generateCreatedResponse(URI.create(entity.getUri()), entity)).build();
}
);
}
/**
* Retrieves the specified user group.
@ -692,7 +693,7 @@ public class TenantsResource extends ApplicationResource {
*
* @param httpServletRequest request
* @param id The id of the user group to update.
* @param userGroupEntity An userGroupEntity.
* @param requestUserGroupEntity An userGroupEntity.
* @return An userGroupEntity.
*/
@PUT
@ -726,45 +727,46 @@ public class TenantsResource extends ApplicationResource {
@ApiParam(
value = "The user group configuration details.",
required = true
) final UserGroupEntity userGroupEntity) {
) final UserGroupEntity requestUserGroupEntity) {
// ensure we're running with a configurable authorizer
if (!(authorizer instanceof AbstractPolicyBasedAuthorizer)) {
throw new IllegalStateException(AccessPolicyDAO.MSG_NON_ABSTRACT_POLICY_BASED_AUTHORIZER);
}
if (userGroupEntity == null || userGroupEntity.getComponent() == null) {
if (requestUserGroupEntity == null || requestUserGroupEntity.getComponent() == null) {
throw new IllegalArgumentException("User group details must be specified.");
}
if (userGroupEntity.getRevision() == null) {
if (requestUserGroupEntity.getRevision() == null) {
throw new IllegalArgumentException("Revision must be specified.");
}
// ensure the ids are the same
final UserGroupDTO userGroupDTO = userGroupEntity.getComponent();
if (!id.equals(userGroupDTO.getId())) {
final UserGroupDTO requestUserGroupDTO = requestUserGroupEntity.getComponent();
if (!id.equals(requestUserGroupDTO.getId())) {
throw new IllegalArgumentException(String.format("The user group id (%s) in the request body does not equal the "
+ "user group id of the requested resource (%s).", userGroupDTO.getId(), id));
+ "user group id of the requested resource (%s).", requestUserGroupDTO.getId(), id));
}
if (isReplicateRequest()) {
return replicate(HttpMethod.PUT, userGroupEntity);
return replicate(HttpMethod.PUT, requestUserGroupEntity);
}
// Extract the revision
final Revision revision = getRevision(userGroupEntity, id);
final Revision requestRevision = getRevision(requestUserGroupEntity, id);
return withWriteLock(
serviceFacade,
revision,
requestUserGroupEntity,
requestRevision,
lookup -> {
final Authorizable tenants = lookup.getTenant();
tenants.authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser());
},
null,
() -> {
(revision, userGroupEntity) -> {
// update the user group
final UserGroupEntity entity = serviceFacade.updateUserGroup(revision, userGroupDTO);
final UserGroupEntity entity = serviceFacade.updateUserGroup(revision, userGroupEntity.getComponent());
populateRemainingUserGroupEntityContent(entity);
return clusterContext(generateOkResponse(entity)).build();
@ -832,19 +834,23 @@ public class TenantsResource extends ApplicationResource {
return replicate(HttpMethod.DELETE);
}
final UserGroupEntity requestUserGroupEntity = new UserGroupEntity();
requestUserGroupEntity.setId(id);
// handle expects request (usually from the cluster manager)
final Revision revision = new Revision(version == null ? null : version.getLong(), clientId.getClientId(), id);
final Revision requestRevision = new Revision(version == null ? null : version.getLong(), clientId.getClientId(), id);
return withWriteLock(
serviceFacade,
revision,
requestUserGroupEntity,
requestRevision,
lookup -> {
final Authorizable tenants = lookup.getTenant();
tenants.authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser());
},
null,
() -> {
(revision, userGroupEntity) -> {
// delete the specified user group
final UserGroupEntity entity = serviceFacade.deleteUserGroup(revision, id);
final UserGroupEntity entity = serviceFacade.deleteUserGroup(revision, userGroupEntity.getId());
return clusterContext(generateOkResponse(entity)).build();
}
);

View File

@ -20,6 +20,7 @@ import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import org.apache.commons.codec.binary.Base64;
import org.apache.nifi.web.security.token.OtpAuthenticationToken;
import org.apache.nifi.web.security.util.CacheKey;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -27,7 +28,6 @@ import javax.crypto.Mac;
import javax.crypto.spec.SecretKeySpec;
import java.nio.charset.StandardCharsets;
import java.security.InvalidKeyException;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.security.SecureRandom;
import java.util.concurrent.ConcurrentMap;
@ -129,7 +129,7 @@ public class OtpService {
cache.putIfAbsent(cacheKey, authenticationToken.getName());
// return the token
return cacheKey.getToken();
return cacheKey.getKey();
}
/**
@ -178,42 +178,4 @@ public class OtpService {
throw new IllegalStateException("Unable to generate single use token.");
}
}
/**
* Key for the cache. Necessary to override the default String.equals() to utilize MessageDigest.isEquals() to prevent timing attacks.
*/
private static class CacheKey {
final String token;
public CacheKey(String token) {
this.token = token;
}
public String getToken() {
return token;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
final CacheKey otherCacheKey = (CacheKey) o;
return MessageDigest.isEqual(token.getBytes(StandardCharsets.UTF_8), otherCacheKey.token.getBytes(StandardCharsets.UTF_8));
}
@Override
public int hashCode() {
return token.hashCode();
}
@Override
public String toString() {
return "CacheKey{token ending in '..." + token.substring(token.length() - 6) + "'}";
}
}
}

View File

@ -0,0 +1,61 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.web.security.util;
import java.nio.charset.StandardCharsets;
import java.security.MessageDigest;
/**
* An authentication token that represents an Authenticated and Authorized user of the NiFi Apis. The authorities are based off the specified UserDetails.
*/
/**
* Key for the cache. Necessary to override the default String.equals() to utilize MessageDigest.isEquals() to prevent timing attacks.
*/
public class CacheKey {
final String key;
public CacheKey(String key) {
this.key = key;
}
public String getKey() {
return key;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
final CacheKey otherCacheKey = (CacheKey) o;
return MessageDigest.isEqual(key.getBytes(StandardCharsets.UTF_8), otherCacheKey.key.getBytes(StandardCharsets.UTF_8));
}
@Override
public int hashCode() {
return key.hashCode();
}
@Override
public String toString() {
return "CacheKey{token ending in '..." + key.substring(key.length() - 6) + "'}";
}
}

View File

@ -190,7 +190,7 @@ md-toolbar.md-small .md-toolbar-tools {
font-style: normal;
font-weight: normal;
font-size: 12px;
max-width: 130px;
max-width: 250px;
text-overflow: ellipsis;
line-height: normal;
overflow: hidden;