diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/RequestReplicator.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/RequestReplicator.java index a4b762ae1a..fad7454f5f 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/RequestReplicator.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/RequestReplicator.java @@ -38,7 +38,11 @@ public interface RequestReplicator { public static final String NODE_CONTINUE = "150-NodeContinue"; public static final int NODE_CONTINUE_STATUS_CODE = 150; - public static final String CLAIM_CANCEL_HEADER = "X-Cancel-Claim"; + /** + * Indicates that the request is intended to cancel a lock that was previously obtained without performing the action + */ + public static final String LOCK_CANCELATION_HEADER = "X-Cancel-Lock"; + public static final String LOCK_VERSION_ID_HEADER = "X-Lock-Version-Id"; /** * Stops the instance from replicating requests. Calling this method on a stopped instance has no effect. diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java index 3d90b6fee9..7a9b56f129 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java @@ -325,6 +325,10 @@ public class ThreadPoolRequestReplicator implements RequestReplicator { private void performVerification(Set nodeIds, String method, URI uri, Object entity, Map headers, StandardAsyncClusterResponse clusterResponse) { logger.debug("Verifying that mutable request {} {} can be made", method, uri.getPath()); + // Add the Lock Version ID to the headers so that it is used in all requests for this transaction + final String lockVersionId = UUID.randomUUID().toString(); + headers.put(RequestReplicator.LOCK_VERSION_ID_HEADER, lockVersionId); + final Map updatedHeaders = new HashMap<>(headers); updatedHeaders.put(REQUEST_VALIDATION_HTTP_HEADER, NODE_CONTINUE); @@ -361,20 +365,21 @@ public class ThreadPoolRequestReplicator implements RequestReplicator { return; } - final Thread cancelClaimThread = new Thread(new Runnable() { + final Map cancelLockHeaders = new HashMap<>(updatedHeaders); + cancelLockHeaders.put(LOCK_CANCELATION_HEADER, "true"); + final Thread cancelLockThread = new Thread(new Runnable() { @Override public void run() { logger.debug("Found {} dissenting nodes for {} {}; canceling claim request", dissentingCount, method, uri.getPath()); - updatedHeaders.put(CLAIM_CANCEL_HEADER, "true"); final Function requestFactory = - nodeId -> new NodeHttpRequest(nodeId, method, createURI(uri, nodeId), entity, updatedHeaders, null); + nodeId -> new NodeHttpRequest(nodeId, method, createURI(uri, nodeId), entity, cancelLockHeaders, null); - replicateRequest(nodeIds, uri.getScheme(), uri.getPath(), requestFactory, updatedHeaders); + replicateRequest(nodeIds, uri.getScheme(), uri.getPath(), requestFactory, cancelLockHeaders); } }); - cancelClaimThread.setName("Cancel Claims"); - cancelClaimThread.start(); + cancelLockThread.setName("Cancel Flow Locks"); + cancelLockThread.start(); // Add a NodeResponse for each node to the Cluster Response // Check that all nodes responded successfully. diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/web/concurrent/DistributedLock.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/web/concurrent/DistributedLock.java new file mode 100644 index 0000000000..b5c974fb6a --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/web/concurrent/DistributedLock.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.web.concurrent; + +import java.util.concurrent.Callable; +import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; + +public interface DistributedLock { + + /** + * Obtains a lock, blocking as long as necessary to obtain the lock. + * Once a lock has been obtained, the identifier of the version of the lock is returned, + * which can be passed to the {@link #withLock(String, Callable)} or + * {@link #unlock(String)} method. Once this method returns, it is + * important that either the {@link #withLock(String, Callable)} or + * {@link #unlock(String)} method be called with this identifier. Otherwise, + * any attempt to claim another read lock or write lock will block until this + * lock expires. + * + * @return the identifier + */ + String lock(); + + /** + * Obtains a lock, blocking as long as necessary to obtain the lock. + * Once a lock has been obtained, the identifier of the version of the lock is returned, + * which can be passed to the {@link #withLock(String, Callable)} or + * {@link #unlock(String)} method. Once this method returns, it is + * important that either the {@link #withLock(String, Callable)} or + * {@link #unlock(String)} method be called with this identifier. Otherwise, + * any attempt to claim another read lock or write lock will block until this + * lock expires. + * + * @param versionIdentifier a value that should be used as the version id instead of generating one. + * This allows us to ensure that all nodes in the cluster use the same id. + * + * @return the identifier + */ + String lock(String versionIdentifier); + + /** + * Waits up to the given amount of time to obtain a lock. If the lock is obtained + * within this time period, the identifier will be returned, as with {@link #lock()}. + * If the lock cannot be obtained within the given time period, null will + * be returned. + * + * @param time the maximum amount of time to wait for the lock + * @param timeUnit the unit of time that the time parameter is in + * @return the identifier of the lock, or null if no lock is obtained + */ + String tryLock(long time, TimeUnit timeUnit); + + /** + * Waits up to the given amount of time to obtain a lock. If the lock is obtained + * within this time period, the identifier will be returned, as with {@link #lock()}. + * If the lock cannot be obtained within the given time period, null will + * be returned. + * + * @param time the maximum amount of time to wait for the lock + * @param timeUnit the unit of time that the time parameter is in + * @param versionIdentifier a value that should be used as the version id instead of generating one. + * This allows us to ensure that all nodes in the cluster use the same id. + * @return the identifier of the lock, or null if no lock is obtained + */ + String tryLock(long time, TimeUnit timeUnit, String versionIdentifier); + + /** + * Performs the given action while this lock is held. The identifier of the lock that was + * obtained by calling {@link #lock()} must be provided to this method. If the + * lock identifier is incorrect, or the lock has expired, a {@link LockExpiredException} + * will be thrown. This method provides a mechanism for verifying that the lock obtained + * by {@link #lock()} or {@link #tryLock(long, TimeUnit)} is still valid and that the action + * being performed will be done so without the lock expiring (i.e., if the lock expires while + * the action is being performed, the lock won't be released until the provided action completes). + * + * @param identifier the identifier of the lock that has already been obtained + * @param action the action to perform + * + * @return the value returned by the given action + * + * @throws LockExpiredException if the provided identifier is not the identifier of the currently + * held lock, or if the lock that was obtained has already expired and is no longer valid + */ + T withLock(String identifier, Supplier action) throws LockExpiredException; + + /** + * Cancels the lock with the given identifier, so that the lock is no longer valid. + * + * @param identifier the identifier of the lock that was obtained by calling {@link #lock()}. + * + * @throws LockExpiredException if the provided identifier is not the identifier of the currently + * held lock, or if the lock that was obtained has already expired and is no longer valid + */ + void unlock(String identifier) throws LockExpiredException; +} diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/web/concurrent/DistributedLockingManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/web/concurrent/DistributedLockingManager.java new file mode 100644 index 0000000000..09c1d5d0b9 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/web/concurrent/DistributedLockingManager.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.web.concurrent; + +/** + *

+ * A DistributedLockingManager is responsible for exposing a mechanism that + * clients can use to obtain a lock on the dataflow. + *

+ * + *

+ * Because of the way in which NiFi replicates requests from one node to all + * other nodes in the cluster, it is important that all nodes in the cluster + * are able to obtain a lock for the request before the request is allowed to + * proceed. This is accomplished by using a two-phase approach. For each request + * that will require a lock (either a read (shared) lock or a write (mutually + * exclusive) lock), the request must be done in two phases. The first phase is + * responsible for obtaining the lock and optionally performing validation of + * the request. Once a node has obtained the necessary lock and performed any + * required validation, the node will respond to the web request with a status + * code of 150 - NodeContinue. + *

+ * + *

+ * At this point, the node that originated the request + * will verify that either all nodes obtained a lock or that at least one node + * failed to obtain a lock. If all nodes respond with a 150 - NodeContinue, + * then the second phase of the request will occur. In the second phase, the + * actual logic of the desired request is performed while the lock is held. + * The lock is then released, once the logic is performed (or if the logic fails + * to be performed). + *

+ * + *

+ * In the case that at least one node responds with a status code with than + * 150 - NodeContinue, the node that originated the request will instead issue + * a cancel request for the second phase so that all nodes are able to unlock + * the lock that was previously obtained for the request. + *

+ * + *

+ * A key consideration in this type of approach that must be taken into account + * is that the node that originated the request could, at any point in time, fail + * as a result of the process being killed, power loss, network connectivity problems, + * etc. As a result, the locks that are obtained through a DistributedLockingManager + * are designed to expire after some amount of time, so that locks are not held + * indefinitely, even in the case of node failure. + *

+ */ +public interface DistributedLockingManager { + + DistributedLock getReadLock(); + + DistributedLock getWriteLock(); + +} diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/web/concurrent/LockExpiredException.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/web/concurrent/LockExpiredException.java new file mode 100644 index 0000000000..c673c9f34d --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/web/concurrent/LockExpiredException.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.web.concurrent; + +public class LockExpiredException extends RuntimeException { + private static final long serialVersionUID = 1L; + + public LockExpiredException(String message) { + super(message); + } +} diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/web/revision/RevisionManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/web/revision/RevisionManager.java index 758529923b..6b6bc4592b 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/web/revision/RevisionManager.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/web/revision/RevisionManager.java @@ -19,18 +19,16 @@ package org.apache.nifi.web.revision; import java.util.Collection; import java.util.List; -import java.util.Set; -import java.util.function.Supplier; import org.apache.nifi.authorization.user.NiFiUser; -import org.apache.nifi.web.InvalidRevisionException; import org.apache.nifi.web.Revision; +import org.apache.nifi.web.concurrent.DistributedLockingManager; /** *

* A Revision Manager provides the ability to prevent clients of the Web API from - * stepping on one another. This is done by providing claims and locking mechanisms + * stepping on one another. This is done by providing revisions * for components individually. *

* @@ -45,68 +43,27 @@ import org.apache.nifi.web.Revision; * *

* When the first phase of the two-phase commit is processed, the Revision Manager should - * be used to obtain a Revision Claim by calling the {@link #requestClaim(Collection)} - * method. If a Claim is granted, then the request validation may continue. If the - * Claim is not granted, the request should fail and the second phase should not - * be performed. + * be used to verify that the client-provided Revisions are current by calling the + * {@link #verifyRevisions(Collection)} + * method. If the revisions are up-to-date, the method will return successfully and the + * request validation may continue. Otherwise, the request should fail and the second phase + * should not be performed. *

* *

* If the first phase of the above two-phase commit completes and all nodes indicate that the - * request may continue, this means that all nodes have provided granted a Claim on the Revisions - * that are relevant. This Claim will automatically expire after some time. This expiration - * means that if the node that issues the first phase never initiates the second phase (if the node - * dies or loses network connectivitiy, for instance), then the Revision Claim will expire and - * the Revision will remain unchanged. + * request may continue, this means that all nodes have agreed that the client's Revisios are + * acceptable. *

* *

- * When the second phase begins, changes to the resource(s) must be made with the Revisions - * locked. This is accomplished by wrapping the logic in a {@link Runnable} and passing the Runnable, - * along with the {@link RevisionClaim} to the {@link #updateRevision(RevisionClaim, Supplier)} method. + * To ensure that the revisions remain consistent between the time that they are validated and + * the time that the modification takes place, it is important that the revisions always be + * validated while an appropriate read or write lock is held, via the {@link DistributedLockingManager}. *

*/ public interface RevisionManager { - /** - *

- * Attempts to obtain a Revision Claim for Revisions supplied. If a Revision Claim - * is granted, no other thread will be allowed to modify any of the components for - * which a Revision is claimed until either the Revision Claim is relinquished by - * calling the {@link #updateRevision(RevisionClaim, Runnable)} method or the - * {@link #releaseClaim(RevisionClaim)} method, or the Revision Claim expires. - *

- * - *

- * This method is atomic. If a Revision Claim is unable to be obtained for any of the - * provided Revisions, then no Revision Claim will be obtained. - *

- * - * @param revisions a Set of Revisions, each of which corresponds to a different - * component for which a Claim is to be acquired. - * @param user the user for which the claim is being requested - * - * @return the Revision Claim that was granted, if one was granted. - * - * @throws InvalidRevisionException if any of the Revisions provided is out-of-date. - */ - RevisionClaim requestClaim(Collection revisions, NiFiUser user) throws InvalidRevisionException; - - /** - *

- * A convenience method that will call {@link #requestClaim(Collection)} by wrapping the given - * Revision in a Collection - *

- * - * @param revision the revision to request a claim for - * @param user the user for which the claim is being requested - * - * @return the Revision Claim that was granted, if one was granted. - * - * @throws InvalidRevisionException if any of the Revisions provided is out-of-date. - */ - RevisionClaim requestClaim(Revision revision, NiFiUser user) throws InvalidRevisionException; - /** * Returns the current Revision for the component with the given ID. If no Revision yet exists for the * component with the given ID, one will be created with a Version of 0 and no Client ID. @@ -135,7 +92,7 @@ public interface RevisionManager { * * @throws ExpiredRevisionClaimException if the Revision Claim has expired */ - RevisionUpdate updateRevision(RevisionClaim claim, NiFiUser modifier, UpdateRevisionTask task) throws ExpiredRevisionClaimException; + RevisionUpdate updateRevision(RevisionClaim claim, NiFiUser modifier, UpdateRevisionTask task); /** * Performs the given task that is expected to remove a component from the flow. As a result, @@ -151,64 +108,6 @@ public interface RevisionManager { */ T deleteRevision(RevisionClaim claim, NiFiUser user, DeleteRevisionTask task) throws ExpiredRevisionClaimException; - /** - * Performs some operation to obtain an object of type T whose identifier is provided via - * the componentId argument, and return that object of type T while holding a Read Lock on - * the Revision for that object. Note that the callback provided must never modify the object - * with the given ID. - * - * @param callback the callback that is to be performed with the Read Lock held - * @return the value returned from the callback - */ - T get(String componentId, ReadOnlyRevisionCallback callback); - - /** - * Performs some operation to obtain an object of type T whose identifier is provided via - * the componentId argument, and return that object of type T while holding a Read Lock on - * the Revision for that object. Note that the callback provided must never modify the object - * with the given ID. - * - * @param callback the callback that is to be performed with the Read Lock held - * @return the value returned from the callback - */ - T get(Set componentId, Supplier callback); - - /** - * Releases the claims on the revisions held by the given Revision Claim, if all of the Revisions - * are up-to-date. - * - * @param claim the claim that holds the revisions - * @param user the user that is releasing the claim. Must be the same user that claimed the revision. - * - * @return true if the claim was released, false if the Revisions were not - * up-to-date - */ - boolean releaseClaim(RevisionClaim claim, NiFiUser user); - - /** - * Releases the claim on the revision for the given component if the claim was obtained by the calling thread - * - * @param componentId the ID of the component - * @return true if the claim was released, false otherwise - */ - boolean cancelClaim(String componentId); - - /** - * Releases the claim on the given revision if the claim was obtained by the calling thread - * - * @param revision the Revision to cancel the claim for - * @return true if the claim was released, false otherwise - */ - boolean cancelClaim(Revision revision); - - /** - * Releases the claims on the given revisions if the claim was obtained by the calling thread - * - * @param revisions the Revisions to cancel claims for - * @return true if all claims were released, false otherwise - */ - boolean cancelClaims(Set revisions); - /** * Clears any revisions that are currently held and resets the Revision Manager so that the revisions * present are those provided in the given collection diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java index dd1f7e03cf..2d093ce1ca 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java @@ -16,6 +16,14 @@ */ package org.apache.nifi.web; +import java.util.Date; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.function.Function; +import java.util.function.Supplier; + import org.apache.nifi.authorization.user.NiFiUser; import org.apache.nifi.controller.ScheduledState; import org.apache.nifi.controller.repository.claim.ContentDirection; @@ -90,13 +98,7 @@ import org.apache.nifi.web.api.entity.SnippetEntity; import org.apache.nifi.web.api.entity.TemplateEntity; import org.apache.nifi.web.api.entity.UserEntity; import org.apache.nifi.web.api.entity.UserGroupEntity; - -import java.util.Date; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.Set; -import java.util.function.Function; +import org.apache.nifi.web.concurrent.LockExpiredException; /** * Defines the NiFiServiceFacade interface. @@ -114,6 +116,104 @@ public interface NiFiServiceFacade { */ void authorizeAccess(AuthorizeAccess authorizeAccess); + /** + * Obtains a read (shared) lock for the entire flow, so that no other + * requests can be made to modify the flow until either this read lock + * is released via {@link #releaseReadLock()} or the lock expires + * + * @return an identifier that indicates the version of the lock, so that other + * requests cannot release a lock that was held by this request + */ + String obtainReadLock(); + + /** + * Obtains a read (shared) lock for the entire flow, so that no other + * requests can be made to modify the flow until either this read lock + * is released via {@link #releaseReadLock()} or the lock expires + * + * @param versionId specifies a value to use for the Version ID for the lock + * + * @return an identifier that indicates the version of the lock, so that other + * requests cannot release a lock that was held by this request + */ + String obtainReadLock(String versionId); + + /** + * Performs the given action while holding the read lock that has already been obtained + * with the given versionIdentifier. This allows the given action to be performed without + * allowing the read lock to expire until the entire action has completed. + * + * @param versionIdentifier the identifier that indicates the version of the lock that + * is held. The value that is to be passed here is the value that was returned from the + * call to {@link #obtainReadLock()}. + * @param action the action to perform + * + * @return the value returned by the action + * @throws LockExpiredException if the lock has expired before the action is invoked + * @throws Exception any Exception thrown by the given action is propagated + */ + T withReadLock(String versionIdentifier, Supplier action) throws LockExpiredException; + + /** + * Releases the read lock held on this flow + * + * @param versionIdentifier the identifier that indicates the version of the lock that + * is held. The value that is to be passed here is the value that was returned from the + * call to {@link #obtainReadLock()}. + * + * @throws LockExpiredException if the lock with the given identifier has already expired or is not valid + */ + void releaseReadLock(String versionIdentifier) throws LockExpiredException; + + /** + * Obtains a write (mutually exclusive) lock for the entire flow, so that no other + * requests can be made to read or modify the flow until either this write lock + * is released via {@link #releaseWriteLock()} or the lock expires + * + * @return an identifier that indicates the version of the lock, so that other + * requests cannot release a lock that was held by this request + */ + String obtainWriteLock(); + + /** + * Obtains a write (mutually exclusive) lock for the entire flow, so that no other + * requests can be made to read or modify the flow until either this write lock + * is released via {@link #releaseWriteLock()} or the lock expires + * + * @param versionId specifies a value to use for the Version ID for the lock + * + * @return an identifier that indicates the version of the lock, so that other + * requests cannot release a lock that was held by this request + */ + String obtainWriteLock(String versionId); + + /** + * Performs the given action while holding the write lock that has already been obtained + * with the given versionIdentifier. This allows the given action to be performed without + * allowing the write lock to expire until the entire action has completed. + * + * @param versionIdentifier the identifier that indicates the version of the lock that + * is held. The value that is to be passed here is the value that was returned from the + * call to {@link #obtainWriteLock()}. + * @param action the action to perform + * + * @return the value returned by the action + * @throws LockExpiredException if the lock has expired before the action is invoked + * @throws Exception any Exception thrown by the given action is propagated + */ + T withWriteLock(String versionIdentifier, Supplier action) throws LockExpiredException; + + /** + * Releases the write lock held on the flow + * + * @param versionIdentifier the identifier that indicates the version of the lock that + * is held. The value that is to be passed here is the value that was returned from the + * call to {@link #obtainWriteLock()}. + * + * @throws LockExpiredException if the lock with the given identifier has already expired or is not valid + */ + void releaseWriteLock(String versionIdentifier) throws LockExpiredException; + /** * Claims the specified revision for the specified user. * @@ -121,7 +221,7 @@ public interface NiFiServiceFacade { * @param user user * @throws InvalidRevisionException invalid revision */ - void claimRevision(Revision revision, NiFiUser user) throws InvalidRevisionException; + void verifyRevision(Revision revision, NiFiUser user) throws InvalidRevisionException; /** * Claims the specified revisions for the specified user. @@ -130,41 +230,7 @@ public interface NiFiServiceFacade { * @param user user * @throws InvalidRevisionException invalid revision */ - void claimRevisions(Set revisions, NiFiUser user) throws InvalidRevisionException; - - /** - * Cancels the specified revision. Cancellation is only supported based on the current thread. - * - * @param revision revision - * @throws InvalidRevisionException invalid revision - */ - void cancelRevision(Revision revision) throws InvalidRevisionException; - - /** - * Cancels the specified revisions. Cancellation is only supported based on the current thread. - * - * @param revisions revision - * @throws InvalidRevisionException invalid revision - */ - void cancelRevisions(Set revisions) throws InvalidRevisionException; - - /** - * Releases the claim that is held on the given revision by the given user - * - * @param revision the revision - * @param user the user - * @throws InvalidRevisionException if the revision is invalid - */ - void releaseRevisionClaim(Revision revision, NiFiUser user) throws InvalidRevisionException; - - /** - * Releases the claim that is held on the given revisions by the given user - * - * @param revisions the revisions - * @param user the user - * @throws InvalidRevisionException if the revision is invalid - */ - void releaseRevisionClaims(Set revisions, NiFiUser user) throws InvalidRevisionException; + void verifyRevisions(Set revisions, NiFiUser user) throws InvalidRevisionException; /** * Gets the current revisions for the components based on the specified function. diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java index da253cadc8..c77778d9d2 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java @@ -16,7 +16,29 @@ */ package org.apache.nifi.web; -import com.google.common.collect.Sets; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Date; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.ListIterator; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.UUID; +import java.util.function.Function; +import java.util.function.Supplier; +import java.util.stream.Collectors; + +import javax.ws.rs.WebApplicationException; +import javax.ws.rs.core.Response; + import org.apache.nifi.action.Action; import org.apache.nifi.action.Component; import org.apache.nifi.action.FlowChangeAction; @@ -161,6 +183,8 @@ import org.apache.nifi.web.api.entity.TemplateEntity; import org.apache.nifi.web.api.entity.TenantEntity; import org.apache.nifi.web.api.entity.UserEntity; import org.apache.nifi.web.api.entity.UserGroupEntity; +import org.apache.nifi.web.concurrent.DistributedLockingManager; +import org.apache.nifi.web.concurrent.LockExpiredException; import org.apache.nifi.web.controller.ControllerFacade; import org.apache.nifi.web.dao.AccessPolicyDAO; import org.apache.nifi.web.dao.ConnectionDAO; @@ -178,7 +202,6 @@ import org.apache.nifi.web.dao.UserDAO; import org.apache.nifi.web.dao.UserGroupDAO; import org.apache.nifi.web.revision.DeleteRevisionTask; import org.apache.nifi.web.revision.ExpiredRevisionClaimException; -import org.apache.nifi.web.revision.ReadOnlyRevisionCallback; import org.apache.nifi.web.revision.RevisionClaim; import org.apache.nifi.web.revision.RevisionManager; import org.apache.nifi.web.revision.RevisionUpdate; @@ -189,28 +212,7 @@ import org.apache.nifi.web.util.SnippetUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.ws.rs.WebApplicationException; -import javax.ws.rs.core.Response; -import java.io.IOException; -import java.nio.charset.StandardCharsets; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Date; -import java.util.HashMap; -import java.util.HashSet; -import java.util.LinkedHashMap; -import java.util.LinkedHashSet; -import java.util.List; -import java.util.ListIterator; -import java.util.Map; -import java.util.Optional; -import java.util.Set; -import java.util.UUID; -import java.util.function.Function; -import java.util.function.Supplier; -import java.util.stream.Collectors; -import java.util.stream.Stream; +import com.google.common.collect.Sets; /** * Implementation of NiFiServiceFacade that performs revision checking. @@ -256,10 +258,52 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { private Authorizer authorizer; private AuthorizableLookup authorizableLookup; + private DistributedLockingManager lockManager; // ----------------------------------------- // Synchronization methods // ----------------------------------------- + @Override + public String obtainReadLock() { + return lockManager.getReadLock().lock(); + } + + @Override + public String obtainReadLock(final String versionIdSeed) { + return lockManager.getReadLock().lock(versionIdSeed); + } + + @Override + public T withReadLock(final String versionIdentifier, final Supplier action) throws LockExpiredException { + return lockManager.getReadLock().withLock(versionIdentifier, action); + } + + @Override + public void releaseReadLock(final String versionIdentifier) throws LockExpiredException { + lockManager.getReadLock().unlock(versionIdentifier); + } + + @Override + public String obtainWriteLock() { + return lockManager.getWriteLock().lock(); + } + + @Override + public String obtainWriteLock(final String versionIdSeed) { + return lockManager.getWriteLock().lock(versionIdSeed); + } + + @Override + public T withWriteLock(final String versionIdentifier, final Supplier action) throws LockExpiredException { + return lockManager.getWriteLock().withLock(versionIdentifier, action); + } + + @Override + public void releaseWriteLock(final String versionIdentifier) throws LockExpiredException { + lockManager.getWriteLock().unlock(versionIdentifier); + } + + @Override public void authorizeAccess(final AuthorizeAccess authorizeAccess) { @@ -267,39 +311,26 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { } @Override - public void claimRevision(final Revision revision, final NiFiUser user) { - revisionManager.requestClaim(revision, user); + public void verifyRevision(final Revision revision, final NiFiUser user) { + final Revision curRevision = revisionManager.getRevision(revision.getComponentId()); + if (revision.equals(curRevision)) { + return; + } + + throw new InvalidRevisionException(revision + " is not the most up-to-date revision. This component appears to have been modified"); } @Override - public void claimRevisions(final Set revisions, final NiFiUser user) { - revisionManager.requestClaim(revisions, user); - } - - @Override - public void cancelRevision(final Revision revision) { - revisionManager.cancelClaim(revision); - } - - @Override - public void cancelRevisions(final Set revisions) { - revisionManager.cancelClaims(revisions); - } - - @Override - public void releaseRevisionClaim(final Revision revision, final NiFiUser user) throws InvalidRevisionException { - revisionManager.releaseClaim(new StandardRevisionClaim(revision), user); - } - - @Override - public void releaseRevisionClaims(final Set revisions, final NiFiUser user) throws InvalidRevisionException { - revisionManager.releaseClaim(new StandardRevisionClaim(revisions), user); + public void verifyRevisions(final Set revisions, final NiFiUser user) { + for (final Revision revision : revisions) { + verifyRevision(revision, user); + } } @Override public Set getRevisionsFromGroup(final String groupId, final Function> getComponents) { final ProcessGroup group = processGroupDAO.getProcessGroup(groupId); - final Set componentIds = revisionManager.get(group.getIdentifier(), rev -> getComponents.apply(group)); + final Set componentIds = getComponents.apply(group); return componentIds.stream().map(id -> revisionManager.getRevision(id)).collect(Collectors.toSet()); } @@ -334,17 +365,12 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { @Override public void verifyUpdateConnection(final ConnectionDTO connectionDTO) { - try { - // if connection does not exist, then the update request is likely creating it - // so we don't verify since it will fail - if (connectionDAO.hasConnection(connectionDTO.getId())) { - connectionDAO.verifyUpdate(connectionDTO); - } else { - connectionDAO.verifyCreate(connectionDTO.getParentGroupId(), connectionDTO); - } - } catch (final Exception e) { - revisionManager.cancelClaim(connectionDTO.getId()); - throw e; + // if connection does not exist, then the update request is likely creating it + // so we don't verify since it will fail + if (connectionDAO.hasConnection(connectionDTO.getId())) { + connectionDAO.verifyUpdate(connectionDTO); + } else { + connectionDAO.verifyCreate(connectionDTO.getParentGroupId(), connectionDTO); } } @@ -360,15 +386,10 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { @Override public void verifyUpdateInputPort(final PortDTO inputPortDTO) { - try { - // if connection does not exist, then the update request is likely creating it - // so we don't verify since it will fail - if (inputPortDAO.hasPort(inputPortDTO.getId())) { - inputPortDAO.verifyUpdate(inputPortDTO); - } - } catch (final Exception e) { - revisionManager.cancelClaim(inputPortDTO.getId()); - throw e; + // if connection does not exist, then the update request is likely creating it + // so we don't verify since it will fail + if (inputPortDAO.hasPort(inputPortDTO.getId())) { + inputPortDAO.verifyUpdate(inputPortDTO); } } @@ -379,15 +400,10 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { @Override public void verifyUpdateOutputPort(final PortDTO outputPortDTO) { - try { - // if connection does not exist, then the update request is likely creating it - // so we don't verify since it will fail - if (outputPortDAO.hasPort(outputPortDTO.getId())) { - outputPortDAO.verifyUpdate(outputPortDTO); - } - } catch (final Exception e) { - revisionManager.cancelClaim(outputPortDTO.getId()); - throw e; + // if connection does not exist, then the update request is likely creating it + // so we don't verify since it will fail + if (outputPortDAO.hasPort(outputPortDTO.getId())) { + outputPortDAO.verifyUpdate(outputPortDTO); } } @@ -398,15 +414,10 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { @Override public void verifyUpdateProcessor(final ProcessorDTO processorDTO) { - try { - // if group does not exist, then the update request is likely creating it - // so we don't verify since it will fail - if (processorDAO.hasProcessor(processorDTO.getId())) { - processorDAO.verifyUpdate(processorDTO); - } - } catch (final Exception e) { - revisionManager.cancelClaim(processorDTO.getId()); - throw e; + // if group does not exist, then the update request is likely creating it + // so we don't verify since it will fail + if (processorDAO.hasProcessor(processorDTO.getId())) { + processorDAO.verifyUpdate(processorDTO); } } @@ -417,12 +428,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { @Override public void verifyScheduleComponents(final String groupId, final ScheduledState state, final Set componentIds) { - try { - processGroupDAO.verifyScheduleComponents(groupId, state, componentIds); - } catch (final Exception e) { - componentIds.forEach(id -> revisionManager.cancelClaim(id)); - throw e; - } + processGroupDAO.verifyScheduleComponents(groupId, state, componentIds); } @Override @@ -432,36 +438,21 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { @Override public void verifyUpdateRemoteProcessGroup(final RemoteProcessGroupDTO remoteProcessGroupDTO) { - try { - // if remote group does not exist, then the update request is likely creating it - // so we don't verify since it will fail - if (remoteProcessGroupDAO.hasRemoteProcessGroup(remoteProcessGroupDTO.getId())) { - remoteProcessGroupDAO.verifyUpdate(remoteProcessGroupDTO); - } - } catch (final Exception e) { - revisionManager.cancelClaim(remoteProcessGroupDTO.getId()); - throw e; + // if remote group does not exist, then the update request is likely creating it + // so we don't verify since it will fail + if (remoteProcessGroupDAO.hasRemoteProcessGroup(remoteProcessGroupDTO.getId())) { + remoteProcessGroupDAO.verifyUpdate(remoteProcessGroupDTO); } } @Override public void verifyUpdateRemoteProcessGroupInputPort(final String remoteProcessGroupId, final RemoteProcessGroupPortDTO remoteProcessGroupPortDTO) { - try { - remoteProcessGroupDAO.verifyUpdateInputPort(remoteProcessGroupId, remoteProcessGroupPortDTO); - } catch (final Exception e) { - revisionManager.cancelClaim(remoteProcessGroupId); - throw e; - } + remoteProcessGroupDAO.verifyUpdateInputPort(remoteProcessGroupId, remoteProcessGroupPortDTO); } @Override public void verifyUpdateRemoteProcessGroupOutputPort(final String remoteProcessGroupId, final RemoteProcessGroupPortDTO remoteProcessGroupPortDTO) { - try { - remoteProcessGroupDAO.verifyUpdateOutputPort(remoteProcessGroupId, remoteProcessGroupPortDTO); - } catch (final Exception e) { - revisionManager.cancelClaim(remoteProcessGroupId); - throw e; - } + remoteProcessGroupDAO.verifyUpdateOutputPort(remoteProcessGroupId, remoteProcessGroupPortDTO); } @Override @@ -471,26 +462,16 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { @Override public void verifyUpdateControllerService(final ControllerServiceDTO controllerServiceDTO) { - try { - // if service does not exist, then the update request is likely creating it - // so we don't verify since it will fail - if (controllerServiceDAO.hasControllerService(controllerServiceDTO.getId())) { - controllerServiceDAO.verifyUpdate(controllerServiceDTO); - } - } catch (final Exception e) { - revisionManager.cancelClaim(controllerServiceDTO.getId()); - throw e; + // if service does not exist, then the update request is likely creating it + // so we don't verify since it will fail + if (controllerServiceDAO.hasControllerService(controllerServiceDTO.getId())) { + controllerServiceDAO.verifyUpdate(controllerServiceDTO); } } @Override public void verifyUpdateControllerServiceReferencingComponents(final String controllerServiceId, final ScheduledState scheduledState, final ControllerServiceState controllerServiceState) { - try { - controllerServiceDAO.verifyUpdateReferencingComponents(controllerServiceId, scheduledState, controllerServiceState); - } catch (final Exception e) { - revisionManager.cancelClaim(controllerServiceId); - throw e; - } + controllerServiceDAO.verifyUpdateReferencingComponents(controllerServiceId, scheduledState, controllerServiceState); } @Override @@ -500,15 +481,10 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { @Override public void verifyUpdateReportingTask(final ReportingTaskDTO reportingTaskDTO) { - try { - // if tasks does not exist, then the update request is likely creating it - // so we don't verify since it will fail - if (reportingTaskDAO.hasReportingTask(reportingTaskDTO.getId())) { - reportingTaskDAO.verifyUpdate(reportingTaskDTO); - } - } catch (final Exception e) { - revisionManager.cancelClaim(reportingTaskDTO.getId()); - throw e; + // if tasks does not exist, then the update request is likely creating it + // so we don't verify since it will fail + if (reportingTaskDAO.hasReportingTask(reportingTaskDTO.getId())) { + reportingTaskDAO.verifyUpdate(reportingTaskDTO); } } @@ -655,15 +631,10 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { @Override public void verifyUpdateSnippet(final SnippetDTO snippetDto, final Set affectedComponentIds) { - try { - // if snippet does not exist, then the update request is likely creating it - // so we don't verify since it will fail - if (snippetDAO.hasSnippet(snippetDto.getId())) { - snippetDAO.verifyUpdateSnippetComponent(snippetDto); - } - } catch (final Exception e) { - affectedComponentIds.forEach(id -> revisionManager.cancelClaim(snippetDto.getId())); - throw e; + // if snippet does not exist, then the update request is likely creating it + // so we don't verify since it will fail + if (snippetDAO.hasSnippet(snippetDto.getId())) { + snippetDAO.verifyUpdateSnippetComponent(snippetDto); } } @@ -875,54 +846,32 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { @Override public void verifyCanClearProcessorState(final String processorId) { - try { - processorDAO.verifyClearState(processorId); - } catch (final Exception e) { - revisionManager.cancelClaim(processorId); - throw e; - } + processorDAO.verifyClearState(processorId); } @Override public void clearProcessorState(final String processorId) { - clearComponentState(processorId, () -> processorDAO.clearState(processorId)); - } - - private void clearComponentState(final String componentId, final Runnable clearState) { - revisionManager.get(componentId, rev -> { - clearState.run(); - return null; - }); + processorDAO.clearState(processorId); } @Override public void verifyCanClearControllerServiceState(final String controllerServiceId) { - try { - controllerServiceDAO.verifyClearState(controllerServiceId); - } catch (final Exception e) { - revisionManager.cancelClaim(controllerServiceId); - throw e; - } + controllerServiceDAO.verifyClearState(controllerServiceId); } @Override public void clearControllerServiceState(final String controllerServiceId) { - clearComponentState(controllerServiceId, () -> controllerServiceDAO.clearState(controllerServiceId)); + controllerServiceDAO.clearState(controllerServiceId); } @Override public void verifyCanClearReportingTaskState(final String reportingTaskId) { - try { - reportingTaskDAO.verifyClearState(reportingTaskId); - } catch (final Exception e) { - revisionManager.cancelClaim(reportingTaskId); - throw e; - } + reportingTaskDAO.verifyClearState(reportingTaskId); } @Override public void clearReportingTaskState(final String reportingTaskId) { - clearComponentState(reportingTaskId, () -> reportingTaskDAO.clearState(reportingTaskId)); + reportingTaskDAO.clearState(reportingTaskId); } @Override @@ -1066,12 +1015,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { @Override public void verifyDeleteSnippet(final String snippetId, final Set affectedComponentIds) { - try { - snippetDAO.verifyDeleteSnippetComponents(snippetId); - } catch (final Exception e) { - affectedComponentIds.forEach(id -> revisionManager.cancelClaim(id)); - throw e; - } + snippetDAO.verifyDeleteSnippetComponents(snippetId); } @Override @@ -1229,29 +1173,22 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { */ private RevisionUpdate createComponent(final Revision revision, final ComponentDTO componentDto, final Supplier daoCreation, final Function dtoCreation) { final NiFiUser user = NiFiUserUtils.getNiFiUser(); - final String groupId = componentDto.getParentGroupId(); // read lock on the containing group - return revisionManager.get(groupId, rev -> { - // request claim for component to be created... revision already verified (version == 0) - final RevisionClaim claim = revisionManager.requestClaim(revision, user); - try { - // update revision through revision manager - return revisionManager.updateRevision(claim, user, () -> { - // add the component - final C component = daoCreation.get(); + // request claim for component to be created... revision already verified (version == 0) + final RevisionClaim claim = new StandardRevisionClaim(revision); - // save the flow - controllerFacade.save(); + // update revision through revision manager + return revisionManager.updateRevision(claim, user, () -> { + // add the component + final C component = daoCreation.get(); - final D dto = dtoCreation.apply(component); - final FlowModification lastMod = new FlowModification(revision.incrementRevision(revision.getClientId()), user.getIdentity()); - return new StandardRevisionUpdate(dto, lastMod); - }); - } finally { - // cancel in case of exception... noop if successful - revisionManager.cancelClaim(revision.getComponentId()); - } + // save the flow + controllerFacade.save(); + + final D dto = dtoCreation.apply(component); + final FlowModification lastMod = new FlowModification(revision.incrementRevision(revision.getClientId()), user.getIdentity()); + return new StandardRevisionUpdate(dto, lastMod); }); } @@ -1319,7 +1256,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { // validate any processors if (flow.getProcessors() != null) { for (final ProcessorDTO processorDTO : flow.getProcessors()) { - final ProcessorNode processorNode = revisionManager.get(processorDTO.getId(), rev -> processorDAO.getProcessor(processorDTO.getId())); + final ProcessorNode processorNode = processorDAO.getProcessor(processorDTO.getId()); final Collection validationErrors = processorNode.getValidationErrors(); if (validationErrors != null && !validationErrors.isEmpty()) { final List errors = new ArrayList<>(); @@ -1333,7 +1270,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { if (flow.getInputPorts() != null) { for (final PortDTO portDTO : flow.getInputPorts()) { - final Port port = revisionManager.get(portDTO.getId(), rev -> inputPortDAO.getPort(portDTO.getId())); + final Port port = inputPortDAO.getPort(portDTO.getId()); final Collection validationErrors = port.getValidationErrors(); if (validationErrors != null && !validationErrors.isEmpty()) { final List errors = new ArrayList<>(); @@ -1347,7 +1284,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { if (flow.getOutputPorts() != null) { for (final PortDTO portDTO : flow.getOutputPorts()) { - final Port port = revisionManager.get(portDTO.getId(), rev -> outputPortDAO.getPort(portDTO.getId())); + final Port port = outputPortDAO.getPort(portDTO.getId()); final Collection validationErrors = port.getValidationErrors(); if (validationErrors != null && !validationErrors.isEmpty()) { final List errors = new ArrayList<>(); @@ -1362,8 +1299,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { // get any remote process group issues if (flow.getRemoteProcessGroups() != null) { for (final RemoteProcessGroupDTO remoteProcessGroupDTO : flow.getRemoteProcessGroups()) { - final RemoteProcessGroup remoteProcessGroup = revisionManager.get( - remoteProcessGroupDTO.getId(), rev -> remoteProcessGroupDAO.getRemoteProcessGroup(remoteProcessGroupDTO.getId())); + final RemoteProcessGroup remoteProcessGroup = remoteProcessGroupDAO.getRemoteProcessGroup(remoteProcessGroupDTO.getId()); if (remoteProcessGroup.getAuthorizationIssue() != null) { remoteProcessGroupDTO.setAuthorizationIssues(Arrays.asList(remoteProcessGroup.getAuthorizationIssue())); @@ -1374,20 +1310,17 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { @Override public FlowEntity copySnippet(final String groupId, final String snippetId, final Double originX, final Double originY, final String idGenerationSeed) { - final FlowDTO flowDto = revisionManager.get(groupId, - rev -> { - // create the new snippet - final FlowSnippetDTO snippet = snippetDAO.copySnippet(groupId, snippetId, originX, originY, idGenerationSeed); + // create the new snippet + final FlowSnippetDTO snippet = snippetDAO.copySnippet(groupId, snippetId, originX, originY, idGenerationSeed); - // save the flow - controllerFacade.save(); + // save the flow + controllerFacade.save(); - // drop the snippet - snippetDAO.dropSnippet(snippetId); + // drop the snippet + snippetDAO.dropSnippet(snippetId); - // post process new flow snippet - return postProcessNewFlowSnippet(groupId, snippet); - }); + // post process new flow snippet + final FlowDTO flowDto = postProcessNewFlowSnippet(groupId, snippet); final FlowEntity flowEntity = new FlowEntity(); flowEntity.setFlow(flowDto); @@ -1396,17 +1329,14 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { @Override public SnippetEntity createSnippet(final SnippetDTO snippetDTO) { - final String groupId = snippetDTO.getParentGroupId(); - final RevisionUpdate snapshot = revisionManager.get(groupId, rev -> { - // add the component - final Snippet snippet = snippetDAO.createSnippet(snippetDTO); + // add the component + final Snippet snippet = snippetDAO.createSnippet(snippetDTO); - // save the flow - controllerFacade.save(); + // save the flow + controllerFacade.save(); - final SnippetDTO dto = dtoFactory.createSnippetDto(snippet); - return new StandardRevisionUpdate(dto, null); - }); + final SnippetDTO dto = dtoFactory.createSnippetDto(snippet); + final RevisionUpdate snapshot = new StandardRevisionUpdate(dto, null); return entityFactory.createSnippetEntity(snapshot.getComponent()); } @@ -1558,27 +1488,22 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { .map(label -> label.getId()) .forEach(id -> identifiers.add(id)); - return revisionManager.get(identifiers, - () -> { - final ProcessGroup group = processGroupDAO.getProcessGroup(groupId); - final ProcessGroupStatus groupStatus = controllerFacade.getProcessGroupStatus(groupId); - return dtoFactory.createFlowDto(group, groupStatus, snippet, revisionManager); - }); + final ProcessGroup group = processGroupDAO.getProcessGroup(groupId); + final ProcessGroupStatus groupStatus = controllerFacade.getProcessGroupStatus(groupId); + return dtoFactory.createFlowDto(group, groupStatus, snippet, revisionManager); } @Override public FlowEntity createTemplateInstance(final String groupId, final Double originX, final Double originY, final String templateId, final String idGenerationSeed) { - final FlowDTO flowDto = revisionManager.get(groupId, rev -> { - // instantiate the template - there is no need to make another copy of the flow snippet since the actual template - // was copied and this dto is only used to instantiate it's components (which as already completed) - final FlowSnippetDTO snippet = templateDAO.instantiateTemplate(groupId, originX, originY, templateId, idGenerationSeed); + // instantiate the template - there is no need to make another copy of the flow snippet since the actual template + // was copied and this dto is only used to instantiate it's components (which as already completed) + final FlowSnippetDTO snippet = templateDAO.instantiateTemplate(groupId, originX, originY, templateId, idGenerationSeed); - // save the flow - controllerFacade.save(); + // save the flow + controllerFacade.save(); - // post process the new flow snippet - return postProcessNewFlowSnippet(groupId, snippet); - }); + // post process the new flow snippet + final FlowDTO flowDto = postProcessNewFlowSnippet(groupId, snippet); final FlowEntity flowEntity = new FlowEntity(); flowEntity.setFlow(flowDto); @@ -1603,11 +1528,10 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { final NiFiUser user = NiFiUserUtils.getNiFiUser(); // request claim for component to be created... revision already verified (version == 0) - final RevisionClaim claim = revisionManager.requestClaim(revision, user); + final RevisionClaim claim = new StandardRevisionClaim(revision); final RevisionUpdate snapshot; if (groupId == null) { - try { // update revision through revision manager snapshot = revisionManager.updateRevision(claim, user, () -> { // Unfortunately, we can not use the createComponent() method here because createComponent() wants to obtain the read lock @@ -1620,26 +1544,15 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { final FlowModification lastMod = new FlowModification(revision.incrementRevision(revision.getClientId()), user.getIdentity()); return new StandardRevisionUpdate(dto, lastMod); }); - } finally { - // cancel in case of exception... noop if successful - revisionManager.cancelClaim(revision.getComponentId()); - } } else { - snapshot = revisionManager.get(groupId, groupRevision -> { - try { - return revisionManager.updateRevision(claim, user, () -> { - final ControllerServiceNode controllerService = controllerServiceDAO.createControllerService(controllerServiceDTO); - final ControllerServiceDTO dto = dtoFactory.createControllerServiceDto(controllerService); + snapshot = revisionManager.updateRevision(claim, user, () -> { + final ControllerServiceNode controllerService = controllerServiceDAO.createControllerService(controllerServiceDTO); + final ControllerServiceDTO dto = dtoFactory.createControllerServiceDto(controllerService); - controllerFacade.save(); + controllerFacade.save(); - final FlowModification lastMod = new FlowModification(revision.incrementRevision(revision.getClientId()), user.getIdentity()); - return new StandardRevisionUpdate(dto, lastMod); - }); - } finally { - // cancel in case of exception... noop if successful - revisionManager.cancelClaim(revision.getComponentId()); - } + final FlowModification lastMod = new FlowModification(revision.incrementRevision(revision.getClientId()), user.getIdentity()); + return new StandardRevisionUpdate(dto, lastMod); }); } @@ -1729,13 +1642,11 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { // TODO remove once we can update a read lock referencingIds.removeAll(lockedIds); - return revisionManager.get(referencingIds, () -> { - final Map referencingRevisions = new HashMap<>(); - for (final ConfiguredComponent component : reference.getReferencingComponents()) { - referencingRevisions.put(component.getIdentifier(), revisionManager.getRevision(component.getIdentifier())); - } - return createControllerServiceReferencingComponentsEntity(reference, referencingRevisions); - }); + final Map referencingRevisions = new HashMap<>(); + for (final ConfiguredComponent component : reference.getReferencingComponents()) { + referencingRevisions.put(component.getIdentifier(), revisionManager.getRevision(component.getIdentifier())); + } + return createControllerServiceReferencingComponentsEntity(reference, referencingRevisions); } /** @@ -1818,29 +1729,25 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { final NiFiUser user = NiFiUserUtils.getNiFiUser(); // request claim for component to be created... revision already verified (version == 0) - final RevisionClaim claim = revisionManager.requestClaim(revision, user); - try { - // update revision through revision manager - final RevisionUpdate snapshot = revisionManager.updateRevision(claim, user, () -> { - // create the reporting task - final ReportingTaskNode reportingTask = reportingTaskDAO.createReportingTask(reportingTaskDTO); + final RevisionClaim claim = new StandardRevisionClaim(revision); - // save the update - controllerFacade.save(); + // update revision through revision manager + final RevisionUpdate snapshot = revisionManager.updateRevision(claim, user, () -> { + // create the reporting task + final ReportingTaskNode reportingTask = reportingTaskDAO.createReportingTask(reportingTaskDTO); - final ReportingTaskDTO dto = dtoFactory.createReportingTaskDto(reportingTask); - final FlowModification lastMod = new FlowModification(revision.incrementRevision(revision.getClientId()), user.getIdentity()); - return new StandardRevisionUpdate(dto, lastMod); - }); + // save the update + controllerFacade.save(); - final ReportingTaskNode reportingTask = reportingTaskDAO.getReportingTask(reportingTaskDTO.getId()); - final AccessPolicyDTO accessPolicy = dtoFactory.createAccessPolicyDto(reportingTask); - final List bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForSource(reportingTask.getIdentifier())); - return entityFactory.createReportingTaskEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), accessPolicy, bulletins); - } finally { - // cancel in case of exception... noop if successful - revisionManager.cancelClaim(revision.getComponentId()); - } + final ReportingTaskDTO dto = dtoFactory.createReportingTaskDto(reportingTask); + final FlowModification lastMod = new FlowModification(revision.incrementRevision(revision.getClientId()), user.getIdentity()); + return new StandardRevisionUpdate(dto, lastMod); + }); + + final ReportingTaskNode reportingTask = reportingTaskDAO.getReportingTask(reportingTaskDTO.getId()); + final AccessPolicyDTO accessPolicy = dtoFactory.createAccessPolicyDto(reportingTask); + final List bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForSource(reportingTask.getIdentifier())); + return entityFactory.createReportingTaskEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), accessPolicy, bulletins); } @Override @@ -1971,47 +1878,32 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { @Override public ComponentStateDTO getProcessorState(final String processorId) { - return revisionManager.get(processorId, new ReadOnlyRevisionCallback() { - @Override - public ComponentStateDTO withRevision(final Revision revision) { - final StateMap clusterState = isClustered() ? processorDAO.getState(processorId, Scope.CLUSTER) : null; - final StateMap localState = processorDAO.getState(processorId, Scope.LOCAL); + final StateMap clusterState = isClustered() ? processorDAO.getState(processorId, Scope.CLUSTER) : null; + final StateMap localState = processorDAO.getState(processorId, Scope.LOCAL); - // processor will be non null as it was already found when getting the state - final ProcessorNode processor = processorDAO.getProcessor(processorId); - return dtoFactory.createComponentStateDTO(processorId, processor.getProcessor().getClass(), localState, clusterState); - } - }); + // processor will be non null as it was already found when getting the state + final ProcessorNode processor = processorDAO.getProcessor(processorId); + return dtoFactory.createComponentStateDTO(processorId, processor.getProcessor().getClass(), localState, clusterState); } @Override public ComponentStateDTO getControllerServiceState(final String controllerServiceId) { - return revisionManager.get(controllerServiceId, new ReadOnlyRevisionCallback() { - @Override - public ComponentStateDTO withRevision(final Revision revision) { - final StateMap clusterState = isClustered() ? controllerServiceDAO.getState(controllerServiceId, Scope.CLUSTER) : null; - final StateMap localState = controllerServiceDAO.getState(controllerServiceId, Scope.LOCAL); + final StateMap clusterState = isClustered() ? controllerServiceDAO.getState(controllerServiceId, Scope.CLUSTER) : null; + final StateMap localState = controllerServiceDAO.getState(controllerServiceId, Scope.LOCAL); - // controller service will be non null as it was already found when getting the state - final ControllerServiceNode controllerService = controllerServiceDAO.getControllerService(controllerServiceId); - return dtoFactory.createComponentStateDTO(controllerServiceId, controllerService.getControllerServiceImplementation().getClass(), localState, clusterState); - } - }); + // controller service will be non null as it was already found when getting the state + final ControllerServiceNode controllerService = controllerServiceDAO.getControllerService(controllerServiceId); + return dtoFactory.createComponentStateDTO(controllerServiceId, controllerService.getControllerServiceImplementation().getClass(), localState, clusterState); } @Override public ComponentStateDTO getReportingTaskState(final String reportingTaskId) { - return revisionManager.get(reportingTaskId, new ReadOnlyRevisionCallback() { - @Override - public ComponentStateDTO withRevision(final Revision revision) { - final StateMap clusterState = isClustered() ? reportingTaskDAO.getState(reportingTaskId, Scope.CLUSTER) : null; - final StateMap localState = reportingTaskDAO.getState(reportingTaskId, Scope.LOCAL); + final StateMap clusterState = isClustered() ? reportingTaskDAO.getState(reportingTaskId, Scope.CLUSTER) : null; + final StateMap localState = reportingTaskDAO.getState(reportingTaskId, Scope.LOCAL); - // reporting task will be non null as it was already found when getting the state - final ReportingTaskNode reportingTask = reportingTaskDAO.getReportingTask(reportingTaskId); - return dtoFactory.createComponentStateDTO(reportingTaskId, reportingTask.getReportingTask().getClass(), localState, clusterState); - } - }); + // reporting task will be non null as it was already found when getting the state + final ReportingTaskNode reportingTask = reportingTaskDAO.getReportingTask(reportingTaskId); + return dtoFactory.createComponentStateDTO(reportingTaskId, reportingTask.getReportingTask().getClass(), localState, clusterState); } @Override @@ -2029,33 +1921,25 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { return countersDto; } + private ConnectionEntity createConnectionEntity(final Connection connection) { + final RevisionDTO revision = dtoFactory.createRevisionDTO(revisionManager.getRevision(connection.getIdentifier())); + final AccessPolicyDTO accessPolicy = dtoFactory.createAccessPolicyDto(connection); + final ConnectionStatusDTO status = dtoFactory.createConnectionStatusDto(controllerFacade.getConnectionStatus(connection.getIdentifier())); + return entityFactory.createConnectionEntity(dtoFactory.createConnectionDto(connection), revision, accessPolicy, status); + } + @Override public Set getConnections(final String groupId) { - return revisionManager.get(groupId, rev -> { - final Set connections = connectionDAO.getConnections(groupId); - final Set connectionIds = connections.stream().map(connection -> connection.getIdentifier()).collect(Collectors.toSet()); - return revisionManager.get(connectionIds, () -> { - return connections.stream() - .map(connection -> { - final RevisionDTO revision = dtoFactory.createRevisionDTO(revisionManager.getRevision(connection.getIdentifier())); - final AccessPolicyDTO accessPolicy = dtoFactory.createAccessPolicyDto(connection); - final ConnectionStatusDTO status = dtoFactory.createConnectionStatusDto(controllerFacade.getConnectionStatus(connection.getIdentifier())); - return entityFactory.createConnectionEntity(dtoFactory.createConnectionDto(connection), revision, accessPolicy, status); - }) - .collect(Collectors.toSet()); - }); - }); + final Set connections = connectionDAO.getConnections(groupId); + return connections.stream() + .map(connection -> createConnectionEntity(connection)) + .collect(Collectors.toSet()); } @Override public ConnectionEntity getConnection(final String connectionId) { - return revisionManager.get(connectionId, rev -> { - final Connection connection = connectionDAO.getConnection(connectionId); - final RevisionDTO revision = dtoFactory.createRevisionDTO(rev); - final AccessPolicyDTO accessPolicy = dtoFactory.createAccessPolicyDto(connection); - final ConnectionStatusDTO status = dtoFactory.createConnectionStatusDto(controllerFacade.getConnectionStatus(connectionId)); - return entityFactory.createConnectionEntity(dtoFactory.createConnectionDto(connectionDAO.getConnection(connectionId)), revision, accessPolicy, status); - }); + final Connection connection = connectionDAO.getConnection(connectionId); + return createConnectionEntity(connection); } @Override @@ -2086,30 +1970,28 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { @Override public ConnectionStatusDTO getConnectionStatus(final String connectionId) { - return revisionManager.get(connectionId, rev -> dtoFactory.createConnectionStatusDto(controllerFacade.getConnectionStatus(connectionId))); + return dtoFactory.createConnectionStatusDto(controllerFacade.getConnectionStatus(connectionId)); } @Override public StatusHistoryDTO getConnectionStatusHistory(final String connectionId) { - return revisionManager.get(connectionId, rev -> controllerFacade.getConnectionStatusHistory(connectionId)); + return controllerFacade.getConnectionStatusHistory(connectionId); + } + + private ProcessorEntity createProcessorEntity(final ProcessorNode processor) { + final RevisionDTO revision = dtoFactory.createRevisionDTO(revisionManager.getRevision(processor.getIdentifier())); + final AccessPolicyDTO accessPolicy = dtoFactory.createAccessPolicyDto(processor); + final ProcessorStatusDTO status = dtoFactory.createProcessorStatusDto(controllerFacade.getProcessorStatus(processor.getIdentifier())); + final List bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForSource(processor.getIdentifier())); + return entityFactory.createProcessorEntity(dtoFactory.createProcessorDto(processor), revision, accessPolicy, status, bulletins); } @Override public Set getProcessors(final String groupId) { final Set processors = processorDAO.getProcessors(groupId); - final Set ids = processors.stream().map(proc -> proc.getIdentifier()).collect(Collectors.toSet()); - return revisionManager.get(ids, () -> { - return processors.stream() - .map(processor -> { - final RevisionDTO revision = dtoFactory.createRevisionDTO(revisionManager.getRevision(processor.getIdentifier())); - final AccessPolicyDTO accessPolicy = dtoFactory.createAccessPolicyDto(processor); - final ProcessorStatusDTO status = dtoFactory.createProcessorStatusDto(controllerFacade.getProcessorStatus(processor.getIdentifier())); - final List bulletins = - dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForSource(processor.getIdentifier())); - return entityFactory.createProcessorEntity(dtoFactory.createProcessorDto(processor), revision, accessPolicy, status, bulletins); - }) - .collect(Collectors.toSet()); - }); + return processors.stream() + .map(processor -> createProcessorEntity(processor)) + .collect(Collectors.toSet()); } @Override @@ -2164,13 +2046,8 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { @Override public ProcessorEntity getProcessor(final String id) { - return revisionManager.get(id, rev -> { - final ProcessorNode processor = processorDAO.getProcessor(id); - final RevisionDTO revision = dtoFactory.createRevisionDTO(rev); - final ProcessorStatusDTO status = dtoFactory.createProcessorStatusDto(controllerFacade.getProcessorStatus(id)); - final List bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForSource(id)); - return entityFactory.createProcessorEntity(dtoFactory.createProcessorDto(processor), revision, dtoFactory.createAccessPolicyDto(processor), status, bulletins); - }); + final ProcessorNode processor = processorDAO.getProcessor(id); + return createProcessorEntity(processor); } @Override @@ -2188,7 +2065,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { @Override public ProcessorStatusDTO getProcessorStatus(final String id) { - return revisionManager.get(id, rev -> dtoFactory.createProcessorStatusDto(controllerFacade.getProcessorStatus(id))); + return dtoFactory.createProcessorStatusDto(controllerFacade.getProcessorStatus(id)); } @Override @@ -2270,46 +2147,33 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { // serialize the input ports this NiFi has access to final Set inputPortDtos = new LinkedHashSet<>(); final Set inputPorts = controllerFacade.getInputPorts(); - final Set inputPortIds = inputPorts.stream().map(port -> port.getIdentifier()).collect(Collectors.toSet()); - revisionManager.get(inputPortIds, () -> { - for (final RootGroupPort inputPort : inputPorts) { - if (isUserAuthorized(user, inputPort)) { - final PortDTO dto = new PortDTO(); - dto.setId(inputPort.getIdentifier()); - dto.setName(inputPort.getName()); - dto.setComments(inputPort.getComments()); - dto.setState(inputPort.getScheduledState().toString()); - inputPortDtos.add(dto); - } + for (final RootGroupPort inputPort : inputPorts) { + if (isUserAuthorized(user, inputPort)) { + final PortDTO dto = new PortDTO(); + dto.setId(inputPort.getIdentifier()); + dto.setName(inputPort.getName()); + dto.setComments(inputPort.getComments()); + dto.setState(inputPort.getScheduledState().toString()); + inputPortDtos.add(dto); } - return null; - }); + } // serialize the output ports this NiFi has access to final Set outputPortDtos = new LinkedHashSet<>(); - final Set outputPorts = controllerFacade.getOutputPorts(); - final Set outputPortIds = outputPorts.stream().map(port -> port.getIdentifier()).collect(Collectors.toSet()); - revisionManager.get(outputPortIds, () -> { - for (final RootGroupPort outputPort : controllerFacade.getOutputPorts()) { - if (isUserAuthorized(user, outputPort)) { - final PortDTO dto = new PortDTO(); - dto.setId(outputPort.getIdentifier()); - dto.setName(outputPort.getName()); - dto.setComments(outputPort.getComments()); - dto.setState(outputPort.getScheduledState().toString()); - outputPortDtos.add(dto); - } + for (final RootGroupPort outputPort : controllerFacade.getOutputPorts()) { + if (isUserAuthorized(user, outputPort)) { + final PortDTO dto = new PortDTO(); + dto.setId(outputPort.getIdentifier()); + dto.setName(outputPort.getName()); + dto.setComments(outputPort.getComments()); + dto.setState(outputPort.getScheduledState().toString()); + outputPortDtos.add(dto); } - - return null; - }); + } // get the root group - final String rootGroupId = controllerFacade.getRootGroupId(); - final ProcessGroupCounts counts = revisionManager.get(rootGroupId, rev -> { - final ProcessGroup rootGroup = processGroupDAO.getProcessGroup(controllerFacade.getRootGroupId()); - return rootGroup.getCounts(); - }); + final ProcessGroup rootGroup = processGroupDAO.getProcessGroup(controllerFacade.getRootGroupId()); + final ProcessGroupCounts counts = rootGroup.getCounts(); // create the controller dto final ControllerDTO controllerDTO = new ControllerDTO(); @@ -2340,12 +2204,11 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { @Override public ControllerConfigurationEntity getControllerConfiguration() { - return revisionManager.get(FlowController.class.getSimpleName(), rev -> { - final ControllerConfigurationDTO dto = dtoFactory.createControllerConfigurationDto(controllerFacade); - final AccessPolicyDTO accessPolicy = dtoFactory.createAccessPolicyDto(controllerFacade); - final RevisionDTO revision = dtoFactory.createRevisionDTO(rev); - return entityFactory.createControllerConfigurationEntity(dto, revision, accessPolicy); - }); + final Revision rev = revisionManager.getRevision(FlowController.class.getSimpleName()); + final ControllerConfigurationDTO dto = dtoFactory.createControllerConfigurationDto(controllerFacade); + final AccessPolicyDTO accessPolicy = dtoFactory.createAccessPolicyDto(controllerFacade); + final RevisionDTO revision = dtoFactory.createRevisionDTO(rev); + return entityFactory.createControllerConfigurationEntity(dto, revision, accessPolicy); } @Override @@ -2358,246 +2221,198 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { @Override public AccessPolicyEntity getAccessPolicy(final String accessPolicyId) { - AccessPolicy preRevisionRequestAccessPolicy = accessPolicyDAO.getAccessPolicy(accessPolicyId); - Set ids = Stream.concat(Stream.of(accessPolicyId), - Stream.concat(preRevisionRequestAccessPolicy.getUsers().stream(), preRevisionRequestAccessPolicy.getGroups().stream())).collect(Collectors.toSet()); - return revisionManager.get(ids, () -> { - final RevisionDTO requestedAccessPolicyRevision = dtoFactory.createRevisionDTO(revisionManager.getRevision(accessPolicyId)); - final AccessPolicy requestedAccessPolicy = accessPolicyDAO.getAccessPolicy(accessPolicyId); - final AccessPolicyDTO accessPolicy = dtoFactory.createAccessPolicyDto(authorizableLookup.getAccessPolicyAuthorizable(accessPolicyId)); - return entityFactory.createAccessPolicyEntity( - dtoFactory.createAccessPolicyDto(requestedAccessPolicy, - requestedAccessPolicy.getGroups().stream().map(mapUserGroupIdToTenantEntity()).collect(Collectors.toSet()), - requestedAccessPolicy.getUsers().stream().map(mapUserIdToTenantEntity()).collect(Collectors.toSet())), - requestedAccessPolicyRevision, accessPolicy); - }); + final RevisionDTO requestedAccessPolicyRevision = dtoFactory.createRevisionDTO(revisionManager.getRevision(accessPolicyId)); + final AccessPolicy requestedAccessPolicy = accessPolicyDAO.getAccessPolicy(accessPolicyId); + final AccessPolicyDTO accessPolicy = dtoFactory.createAccessPolicyDto(authorizableLookup.getAccessPolicyAuthorizable(accessPolicyId)); + return entityFactory.createAccessPolicyEntity( + dtoFactory.createAccessPolicyDto(requestedAccessPolicy, + requestedAccessPolicy.getGroups().stream().map(mapUserGroupIdToTenantEntity()).collect(Collectors.toSet()), + requestedAccessPolicy.getUsers().stream().map(mapUserIdToTenantEntity()).collect(Collectors.toSet())), + requestedAccessPolicyRevision, accessPolicy); } @Override public UserEntity getUser(final String userId) { final Authorizable usersAuthorizable = authorizableLookup.getTenantAuthorizable(); - Set ids = Stream.concat(Stream.of(userId), userDAO.getUser(userId).getGroups().stream()).collect(Collectors.toSet()); - return revisionManager.get(ids, () -> { - final RevisionDTO userRevision = dtoFactory.createRevisionDTO(revisionManager.getRevision(userId)); - final AccessPolicyDTO accessPolicy = dtoFactory.createAccessPolicyDto(usersAuthorizable); - final User user = userDAO.getUser(userId); - final Set userGroups = user.getGroups().stream() - .map(mapUserGroupIdToTenantEntity()).collect(Collectors.toSet()); - return entityFactory.createUserEntity(dtoFactory.createUserDto(user, userGroups), userRevision, accessPolicy); - }); + final RevisionDTO userRevision = dtoFactory.createRevisionDTO(revisionManager.getRevision(userId)); + final AccessPolicyDTO accessPolicy = dtoFactory.createAccessPolicyDto(usersAuthorizable); + final User user = userDAO.getUser(userId); + final Set userGroups = user.getGroups().stream() + .map(mapUserGroupIdToTenantEntity()).collect(Collectors.toSet()); + return entityFactory.createUserEntity(dtoFactory.createUserDto(user, userGroups), userRevision, accessPolicy); + } + + private UserEntity createUserEntity(final User user) { + final RevisionDTO userRevision = dtoFactory.createRevisionDTO(revisionManager.getRevision(user.getIdentifier())); + final AccessPolicyDTO accessPolicy = dtoFactory.createAccessPolicyDto(authorizableLookup.getTenantAuthorizable()); + final Set userGroups = user.getGroups().stream().map(mapUserGroupIdToTenantEntity()).collect(Collectors.toSet()); + return entityFactory.createUserEntity(dtoFactory.createUserDto(user, userGroups), userRevision, accessPolicy); } @Override public Set getUsers() { final Set users = userDAO.getUsers(); - final Set ids = users.stream().flatMap(user -> Stream.concat(Stream.of(user.getIdentifier()), user.getGroups().stream())).collect(Collectors.toSet()); - return revisionManager.get(ids, () -> { - return users.stream() - .map(user -> { - final RevisionDTO userRevision = dtoFactory.createRevisionDTO(revisionManager.getRevision(user.getIdentifier())); - final AccessPolicyDTO accessPolicy = dtoFactory.createAccessPolicyDto(authorizableLookup.getTenantAuthorizable()); - final Set userGroups = user.getGroups().stream().map(mapUserGroupIdToTenantEntity()).collect(Collectors.toSet()); - return entityFactory.createUserEntity(dtoFactory.createUserDto(user, userGroups), userRevision, accessPolicy); - }).collect(Collectors.toSet()); - }); + return users.stream() + .map(user -> createUserEntity(user)) + .collect(Collectors.toSet()); + } + + private UserGroupEntity createUserGroupEntity(final Group userGroup) { + final RevisionDTO userGroupRevision = dtoFactory.createRevisionDTO(revisionManager.getRevision(userGroup.getIdentifier())); + final Set users = userGroup.getUsers().stream().map(mapUserIdToTenantEntity()).collect(Collectors.toSet()); + return entityFactory.createUserGroupEntity(dtoFactory.createUserGroupDto(userGroup, users), userGroupRevision, + dtoFactory.createAccessPolicyDto(authorizableLookup.getTenantAuthorizable())); } @Override public UserGroupEntity getUserGroup(final String userGroupId) { - Set ids = Stream.concat(Stream.of(userGroupId), userGroupDAO.getUserGroup(userGroupId).getUsers().stream()).collect(Collectors.toSet()); - return revisionManager.get(ids, () -> { - final RevisionDTO userGroupRevision = dtoFactory.createRevisionDTO(revisionManager.getRevision(userGroupId)); - final Group userGroup = userGroupDAO.getUserGroup(userGroupId); - final Set users = userGroup.getUsers().stream().map(mapUserIdToTenantEntity()).collect(Collectors.toSet()); - return entityFactory.createUserGroupEntity(dtoFactory.createUserGroupDto(userGroup, users), userGroupRevision, - dtoFactory.createAccessPolicyDto(authorizableLookup.getTenantAuthorizable())); - }); + final Group userGroup = userGroupDAO.getUserGroup(userGroupId); + return createUserGroupEntity(userGroup); } @Override public Set getUserGroups() { - final Authorizable userGroupAuthorizable = authorizableLookup.getTenantAuthorizable(); final Set userGroups = userGroupDAO.getUserGroups(); - final Set ids = userGroups.stream().flatMap(userGroup -> Stream.concat(Stream.of(userGroup.getIdentifier()), userGroup.getUsers().stream())).collect(Collectors.toSet()); - return revisionManager.get(ids, () -> { - return userGroups.stream() - .map(userGroup -> { - final RevisionDTO userGroupRevision = dtoFactory.createRevisionDTO(revisionManager.getRevision(userGroup.getIdentifier())); - final AccessPolicyDTO accessPolicy = dtoFactory.createAccessPolicyDto(userGroupAuthorizable); - final Set users = userGroup.getUsers().stream().map(mapUserIdToTenantEntity()).collect(Collectors.toSet()); - return entityFactory.createUserGroupEntity(dtoFactory.createUserGroupDto(userGroup, users), userGroupRevision, accessPolicy); - }).collect(Collectors.toSet()); - }); + return userGroups.stream() + .map(userGroup -> createUserGroupEntity(userGroup)) + .collect(Collectors.toSet()); + } + + private LabelEntity createLabelEntity(final Label label) { + final RevisionDTO revision = dtoFactory.createRevisionDTO(revisionManager.getRevision(label.getIdentifier())); + final AccessPolicyDTO accessPolicy = dtoFactory.createAccessPolicyDto(label); + return entityFactory.createLabelEntity(dtoFactory.createLabelDto(label), revision, accessPolicy); } @Override public Set getLabels(final String groupId) { final Set