NIFI-2170: Refactor RevisionManager into a RevisionManager and a DistributedLockingManager. This closes #610

This commit is contained in:
Mark Payne 2016-07-05 13:02:03 -04:00 committed by Matt Gilman
parent 181386b943
commit f4c94e349c
20 changed files with 1562 additions and 2240 deletions

View File

@ -38,7 +38,11 @@ public interface RequestReplicator {
public static final String NODE_CONTINUE = "150-NodeContinue";
public static final int NODE_CONTINUE_STATUS_CODE = 150;
public static final String CLAIM_CANCEL_HEADER = "X-Cancel-Claim";
/**
* Indicates that the request is intended to cancel a lock that was previously obtained without performing the action
*/
public static final String LOCK_CANCELATION_HEADER = "X-Cancel-Lock";
public static final String LOCK_VERSION_ID_HEADER = "X-Lock-Version-Id";
/**
* Stops the instance from replicating requests. Calling this method on a stopped instance has no effect.

View File

@ -325,6 +325,10 @@ public class ThreadPoolRequestReplicator implements RequestReplicator {
private void performVerification(Set<NodeIdentifier> nodeIds, String method, URI uri, Object entity, Map<String, String> headers, StandardAsyncClusterResponse clusterResponse) {
logger.debug("Verifying that mutable request {} {} can be made", method, uri.getPath());
// Add the Lock Version ID to the headers so that it is used in all requests for this transaction
final String lockVersionId = UUID.randomUUID().toString();
headers.put(RequestReplicator.LOCK_VERSION_ID_HEADER, lockVersionId);
final Map<String, String> updatedHeaders = new HashMap<>(headers);
updatedHeaders.put(REQUEST_VALIDATION_HTTP_HEADER, NODE_CONTINUE);
@ -361,20 +365,21 @@ public class ThreadPoolRequestReplicator implements RequestReplicator {
return;
}
final Thread cancelClaimThread = new Thread(new Runnable() {
final Map<String, String> cancelLockHeaders = new HashMap<>(updatedHeaders);
cancelLockHeaders.put(LOCK_CANCELATION_HEADER, "true");
final Thread cancelLockThread = new Thread(new Runnable() {
@Override
public void run() {
logger.debug("Found {} dissenting nodes for {} {}; canceling claim request", dissentingCount, method, uri.getPath());
updatedHeaders.put(CLAIM_CANCEL_HEADER, "true");
final Function<NodeIdentifier, NodeHttpRequest> requestFactory =
nodeId -> new NodeHttpRequest(nodeId, method, createURI(uri, nodeId), entity, updatedHeaders, null);
nodeId -> new NodeHttpRequest(nodeId, method, createURI(uri, nodeId), entity, cancelLockHeaders, null);
replicateRequest(nodeIds, uri.getScheme(), uri.getPath(), requestFactory, updatedHeaders);
replicateRequest(nodeIds, uri.getScheme(), uri.getPath(), requestFactory, cancelLockHeaders);
}
});
cancelClaimThread.setName("Cancel Claims");
cancelClaimThread.start();
cancelLockThread.setName("Cancel Flow Locks");
cancelLockThread.start();
// Add a NodeResponse for each node to the Cluster Response
// Check that all nodes responded successfully.

View File

@ -0,0 +1,111 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.web.concurrent;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
public interface DistributedLock {
/**
* Obtains a lock, blocking as long as necessary to obtain the lock.
* Once a lock has been obtained, the identifier of the version of the lock is returned,
* which can be passed to the {@link #withLock(String, Callable)} or
* {@link #unlock(String)} method. Once this method returns, it is
* important that either the {@link #withLock(String, Callable)} or
* {@link #unlock(String)} method be called with this identifier. Otherwise,
* any attempt to claim another read lock or write lock will block until this
* lock expires.
*
* @return the identifier
*/
String lock();
/**
* Obtains a lock, blocking as long as necessary to obtain the lock.
* Once a lock has been obtained, the identifier of the version of the lock is returned,
* which can be passed to the {@link #withLock(String, Callable)} or
* {@link #unlock(String)} method. Once this method returns, it is
* important that either the {@link #withLock(String, Callable)} or
* {@link #unlock(String)} method be called with this identifier. Otherwise,
* any attempt to claim another read lock or write lock will block until this
* lock expires.
*
* @param versionIdentifier a value that should be used as the version id instead of generating one.
* This allows us to ensure that all nodes in the cluster use the same id.
*
* @return the identifier
*/
String lock(String versionIdentifier);
/**
* Waits up to the given amount of time to obtain a lock. If the lock is obtained
* within this time period, the identifier will be returned, as with {@link #lock()}.
* If the lock cannot be obtained within the given time period, <code>null</code> will
* be returned.
*
* @param time the maximum amount of time to wait for the lock
* @param timeUnit the unit of time that the time parameter is in
* @return the identifier of the lock, or <code>null</code> if no lock is obtained
*/
String tryLock(long time, TimeUnit timeUnit);
/**
* Waits up to the given amount of time to obtain a lock. If the lock is obtained
* within this time period, the identifier will be returned, as with {@link #lock()}.
* If the lock cannot be obtained within the given time period, <code>null</code> will
* be returned.
*
* @param time the maximum amount of time to wait for the lock
* @param timeUnit the unit of time that the time parameter is in
* @param versionIdentifier a value that should be used as the version id instead of generating one.
* This allows us to ensure that all nodes in the cluster use the same id.
* @return the identifier of the lock, or <code>null</code> if no lock is obtained
*/
String tryLock(long time, TimeUnit timeUnit, String versionIdentifier);
/**
* Performs the given action while this lock is held. The identifier of the lock that was
* obtained by calling {@link #lock()} must be provided to this method. If the
* lock identifier is incorrect, or the lock has expired, a {@link LockExpiredException}
* will be thrown. This method provides a mechanism for verifying that the lock obtained
* by {@link #lock()} or {@link #tryLock(long, TimeUnit)} is still valid and that the action
* being performed will be done so without the lock expiring (i.e., if the lock expires while
* the action is being performed, the lock won't be released until the provided action completes).
*
* @param identifier the identifier of the lock that has already been obtained
* @param action the action to perform
*
* @return the value returned by the given action
*
* @throws LockExpiredException if the provided identifier is not the identifier of the currently
* held lock, or if the lock that was obtained has already expired and is no longer valid
*/
<T> T withLock(String identifier, Supplier<T> action) throws LockExpiredException;
/**
* Cancels the lock with the given identifier, so that the lock is no longer valid.
*
* @param identifier the identifier of the lock that was obtained by calling {@link #lock()}.
*
* @throws LockExpiredException if the provided identifier is not the identifier of the currently
* held lock, or if the lock that was obtained has already expired and is no longer valid
*/
void unlock(String identifier) throws LockExpiredException;
}

View File

@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.web.concurrent;
/**
* <p>
* A DistributedLockingManager is responsible for exposing a mechanism that
* clients can use to obtain a lock on the dataflow.
* </p>
*
* <p>
* Because of the way in which NiFi replicates requests from one node to all
* other nodes in the cluster, it is important that all nodes in the cluster
* are able to obtain a lock for the request before the request is allowed to
* proceed. This is accomplished by using a two-phase approach. For each request
* that will require a lock (either a read (shared) lock or a write (mutually
* exclusive) lock), the request must be done in two phases. The first phase is
* responsible for obtaining the lock and optionally performing validation of
* the request. Once a node has obtained the necessary lock and performed any
* required validation, the node will respond to the web request with a status
* code of 150 - NodeContinue.
* </p>
*
* <p>
* At this point, the node that originated the request
* will verify that either all nodes obtained a lock or that at least one node
* failed to obtain a lock. If all nodes respond with a 150 - NodeContinue,
* then the second phase of the request will occur. In the second phase, the
* actual logic of the desired request is performed while the lock is held.
* The lock is then released, once the logic is performed (or if the logic fails
* to be performed).
* </p>
*
* <p>
* In the case that at least one node responds with a status code with than
* 150 - NodeContinue, the node that originated the request will instead issue
* a cancel request for the second phase so that all nodes are able to unlock
* the lock that was previously obtained for the request.
* </p>
*
* <p>
* A key consideration in this type of approach that must be taken into account
* is that the node that originated the request could, at any point in time, fail
* as a result of the process being killed, power loss, network connectivity problems,
* etc. As a result, the locks that are obtained through a DistributedLockingManager
* are designed to expire after some amount of time, so that locks are not held
* indefinitely, even in the case of node failure.
* </p>
*/
public interface DistributedLockingManager {
DistributedLock getReadLock();
DistributedLock getWriteLock();
}

View File

@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.web.concurrent;
public class LockExpiredException extends RuntimeException {
private static final long serialVersionUID = 1L;
public LockExpiredException(String message) {
super(message);
}
}

View File

@ -19,18 +19,16 @@ package org.apache.nifi.web.revision;
import java.util.Collection;
import java.util.List;
import java.util.Set;
import java.util.function.Supplier;
import org.apache.nifi.authorization.user.NiFiUser;
import org.apache.nifi.web.InvalidRevisionException;
import org.apache.nifi.web.Revision;
import org.apache.nifi.web.concurrent.DistributedLockingManager;
/**
* <p>
* A Revision Manager provides the ability to prevent clients of the Web API from
* stepping on one another. This is done by providing claims and locking mechanisms
* stepping on one another. This is done by providing revisions
* for components individually.
* </p>
*
@ -45,68 +43,27 @@ import org.apache.nifi.web.Revision;
*
* <p>
* When the first phase of the two-phase commit is processed, the Revision Manager should
* be used to obtain a Revision Claim by calling the {@link #requestClaim(Collection)}
* method. If a Claim is granted, then the request validation may continue. If the
* Claim is not granted, the request should fail and the second phase should not
* be performed.
* be used to verify that the client-provided Revisions are current by calling the
* {@link #verifyRevisions(Collection)}
* method. If the revisions are up-to-date, the method will return successfully and the
* request validation may continue. Otherwise, the request should fail and the second phase
* should not be performed.
* </p>
*
* <p>
* If the first phase of the above two-phase commit completes and all nodes indicate that the
* request may continue, this means that all nodes have provided granted a Claim on the Revisions
* that are relevant. This Claim will automatically expire after some time. This expiration
* means that if the node that issues the first phase never initiates the second phase (if the node
* dies or loses network connectivitiy, for instance), then the Revision Claim will expire and
* the Revision will remain unchanged.
* request may continue, this means that all nodes have agreed that the client's Revisios are
* acceptable.
* </p>
*
* <p>
* When the second phase begins, changes to the resource(s) must be made with the Revisions
* locked. This is accomplished by wrapping the logic in a {@link Runnable} and passing the Runnable,
* along with the {@link RevisionClaim} to the {@link #updateRevision(RevisionClaim, Supplier)} method.
* To ensure that the revisions remain consistent between the time that they are validated and
* the time that the modification takes place, it is important that the revisions always be
* validated while an appropriate read or write lock is held, via the {@link DistributedLockingManager}.
* </p>
*/
public interface RevisionManager {
/**
* <p>
* Attempts to obtain a Revision Claim for Revisions supplied. If a Revision Claim
* is granted, no other thread will be allowed to modify any of the components for
* which a Revision is claimed until either the Revision Claim is relinquished by
* calling the {@link #updateRevision(RevisionClaim, Runnable)} method or the
* {@link #releaseClaim(RevisionClaim)} method, or the Revision Claim expires.
* </p>
*
* <p>
* This method is atomic. If a Revision Claim is unable to be obtained for any of the
* provided Revisions, then no Revision Claim will be obtained.
* </p>
*
* @param revisions a Set of Revisions, each of which corresponds to a different
* component for which a Claim is to be acquired.
* @param user the user for which the claim is being requested
*
* @return the Revision Claim that was granted, if one was granted.
*
* @throws InvalidRevisionException if any of the Revisions provided is out-of-date.
*/
RevisionClaim requestClaim(Collection<Revision> revisions, NiFiUser user) throws InvalidRevisionException;
/**
* <p>
* A convenience method that will call {@link #requestClaim(Collection)} by wrapping the given
* Revision in a Collection
* </p>
*
* @param revision the revision to request a claim for
* @param user the user for which the claim is being requested
*
* @return the Revision Claim that was granted, if one was granted.
*
* @throws InvalidRevisionException if any of the Revisions provided is out-of-date.
*/
RevisionClaim requestClaim(Revision revision, NiFiUser user) throws InvalidRevisionException;
/**
* Returns the current Revision for the component with the given ID. If no Revision yet exists for the
* component with the given ID, one will be created with a Version of 0 and no Client ID.
@ -135,7 +92,7 @@ public interface RevisionManager {
*
* @throws ExpiredRevisionClaimException if the Revision Claim has expired
*/
<T> RevisionUpdate<T> updateRevision(RevisionClaim claim, NiFiUser modifier, UpdateRevisionTask<T> task) throws ExpiredRevisionClaimException;
<T> RevisionUpdate<T> updateRevision(RevisionClaim claim, NiFiUser modifier, UpdateRevisionTask<T> task);
/**
* Performs the given task that is expected to remove a component from the flow. As a result,
@ -151,64 +108,6 @@ public interface RevisionManager {
*/
<T> T deleteRevision(RevisionClaim claim, NiFiUser user, DeleteRevisionTask<T> task) throws ExpiredRevisionClaimException;
/**
* Performs some operation to obtain an object of type T whose identifier is provided via
* the componentId argument, and return that object of type T while holding a Read Lock on
* the Revision for that object. Note that the callback provided must never modify the object
* with the given ID.
*
* @param callback the callback that is to be performed with the Read Lock held
* @return the value returned from the callback
*/
<T> T get(String componentId, ReadOnlyRevisionCallback<T> callback);
/**
* Performs some operation to obtain an object of type T whose identifier is provided via
* the componentId argument, and return that object of type T while holding a Read Lock on
* the Revision for that object. Note that the callback provided must never modify the object
* with the given ID.
*
* @param callback the callback that is to be performed with the Read Lock held
* @return the value returned from the callback
*/
<T> T get(Set<String> componentId, Supplier<T> callback);
/**
* Releases the claims on the revisions held by the given Revision Claim, if all of the Revisions
* are up-to-date.
*
* @param claim the claim that holds the revisions
* @param user the user that is releasing the claim. Must be the same user that claimed the revision.
*
* @return <code>true</code> if the claim was released, <code>false</code> if the Revisions were not
* up-to-date
*/
boolean releaseClaim(RevisionClaim claim, NiFiUser user);
/**
* Releases the claim on the revision for the given component if the claim was obtained by the calling thread
*
* @param componentId the ID of the component
* @return <code>true</code> if the claim was released, false otherwise
*/
boolean cancelClaim(String componentId);
/**
* Releases the claim on the given revision if the claim was obtained by the calling thread
*
* @param revision the Revision to cancel the claim for
* @return <code>true</code> if the claim was released, false otherwise
*/
boolean cancelClaim(Revision revision);
/**
* Releases the claims on the given revisions if the claim was obtained by the calling thread
*
* @param revisions the Revisions to cancel claims for
* @return <code>true</code> if all claims were released, false otherwise
*/
boolean cancelClaims(Set<Revision> revisions);
/**
* Clears any revisions that are currently held and resets the Revision Manager so that the revisions
* present are those provided in the given collection

View File

@ -16,6 +16,14 @@
*/
package org.apache.nifi.web;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.Function;
import java.util.function.Supplier;
import org.apache.nifi.authorization.user.NiFiUser;
import org.apache.nifi.controller.ScheduledState;
import org.apache.nifi.controller.repository.claim.ContentDirection;
@ -90,13 +98,7 @@ import org.apache.nifi.web.api.entity.SnippetEntity;
import org.apache.nifi.web.api.entity.TemplateEntity;
import org.apache.nifi.web.api.entity.UserEntity;
import org.apache.nifi.web.api.entity.UserGroupEntity;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.Function;
import org.apache.nifi.web.concurrent.LockExpiredException;
/**
* Defines the NiFiServiceFacade interface.
@ -114,6 +116,104 @@ public interface NiFiServiceFacade {
*/
void authorizeAccess(AuthorizeAccess authorizeAccess);
/**
* Obtains a read (shared) lock for the entire flow, so that no other
* requests can be made to modify the flow until either this read lock
* is released via {@link #releaseReadLock()} or the lock expires
*
* @return an identifier that indicates the version of the lock, so that other
* requests cannot release a lock that was held by this request
*/
String obtainReadLock();
/**
* Obtains a read (shared) lock for the entire flow, so that no other
* requests can be made to modify the flow until either this read lock
* is released via {@link #releaseReadLock()} or the lock expires
*
* @param versionId specifies a value to use for the Version ID for the lock
*
* @return an identifier that indicates the version of the lock, so that other
* requests cannot release a lock that was held by this request
*/
String obtainReadLock(String versionId);
/**
* Performs the given action while holding the read lock that has already been obtained
* with the given versionIdentifier. This allows the given action to be performed without
* allowing the read lock to expire until the entire action has completed.
*
* @param versionIdentifier the identifier that indicates the version of the lock that
* is held. The value that is to be passed here is the value that was returned from the
* call to {@link #obtainReadLock()}.
* @param action the action to perform
*
* @return the value returned by the action
* @throws LockExpiredException if the lock has expired before the action is invoked
* @throws Exception any Exception thrown by the given action is propagated
*/
<T> T withReadLock(String versionIdentifier, Supplier<T> action) throws LockExpiredException;
/**
* Releases the read lock held on this flow
*
* @param versionIdentifier the identifier that indicates the version of the lock that
* is held. The value that is to be passed here is the value that was returned from the
* call to {@link #obtainReadLock()}.
*
* @throws LockExpiredException if the lock with the given identifier has already expired or is not valid
*/
void releaseReadLock(String versionIdentifier) throws LockExpiredException;
/**
* Obtains a write (mutually exclusive) lock for the entire flow, so that no other
* requests can be made to read or modify the flow until either this write lock
* is released via {@link #releaseWriteLock()} or the lock expires
*
* @return an identifier that indicates the version of the lock, so that other
* requests cannot release a lock that was held by this request
*/
String obtainWriteLock();
/**
* Obtains a write (mutually exclusive) lock for the entire flow, so that no other
* requests can be made to read or modify the flow until either this write lock
* is released via {@link #releaseWriteLock()} or the lock expires
*
* @param versionId specifies a value to use for the Version ID for the lock
*
* @return an identifier that indicates the version of the lock, so that other
* requests cannot release a lock that was held by this request
*/
String obtainWriteLock(String versionId);
/**
* Performs the given action while holding the write lock that has already been obtained
* with the given versionIdentifier. This allows the given action to be performed without
* allowing the write lock to expire until the entire action has completed.
*
* @param versionIdentifier the identifier that indicates the version of the lock that
* is held. The value that is to be passed here is the value that was returned from the
* call to {@link #obtainWriteLock()}.
* @param action the action to perform
*
* @return the value returned by the action
* @throws LockExpiredException if the lock has expired before the action is invoked
* @throws Exception any Exception thrown by the given action is propagated
*/
<T> T withWriteLock(String versionIdentifier, Supplier<T> action) throws LockExpiredException;
/**
* Releases the write lock held on the flow
*
* @param versionIdentifier the identifier that indicates the version of the lock that
* is held. The value that is to be passed here is the value that was returned from the
* call to {@link #obtainWriteLock()}.
*
* @throws LockExpiredException if the lock with the given identifier has already expired or is not valid
*/
void releaseWriteLock(String versionIdentifier) throws LockExpiredException;
/**
* Claims the specified revision for the specified user.
*
@ -121,7 +221,7 @@ public interface NiFiServiceFacade {
* @param user user
* @throws InvalidRevisionException invalid revision
*/
void claimRevision(Revision revision, NiFiUser user) throws InvalidRevisionException;
void verifyRevision(Revision revision, NiFiUser user) throws InvalidRevisionException;
/**
* Claims the specified revisions for the specified user.
@ -130,41 +230,7 @@ public interface NiFiServiceFacade {
* @param user user
* @throws InvalidRevisionException invalid revision
*/
void claimRevisions(Set<Revision> revisions, NiFiUser user) throws InvalidRevisionException;
/**
* Cancels the specified revision. Cancellation is only supported based on the current thread.
*
* @param revision revision
* @throws InvalidRevisionException invalid revision
*/
void cancelRevision(Revision revision) throws InvalidRevisionException;
/**
* Cancels the specified revisions. Cancellation is only supported based on the current thread.
*
* @param revisions revision
* @throws InvalidRevisionException invalid revision
*/
void cancelRevisions(Set<Revision> revisions) throws InvalidRevisionException;
/**
* Releases the claim that is held on the given revision by the given user
*
* @param revision the revision
* @param user the user
* @throws InvalidRevisionException if the revision is invalid
*/
void releaseRevisionClaim(Revision revision, NiFiUser user) throws InvalidRevisionException;
/**
* Releases the claim that is held on the given revisions by the given user
*
* @param revisions the revisions
* @param user the user
* @throws InvalidRevisionException if the revision is invalid
*/
void releaseRevisionClaims(Set<Revision> revisions, NiFiUser user) throws InvalidRevisionException;
void verifyRevisions(Set<Revision> revisions, NiFiUser user) throws InvalidRevisionException;
/**
* Gets the current revisions for the components based on the specified function.

View File

@ -52,6 +52,7 @@ import org.apache.nifi.web.api.dto.RevisionDTO;
import org.apache.nifi.web.api.entity.ControllerServiceEntity;
import org.apache.nifi.web.api.entity.ProcessorEntity;
import org.apache.nifi.web.api.entity.ReportingTaskEntity;
import org.apache.nifi.web.concurrent.LockExpiredException;
import org.apache.nifi.web.util.ClientResponseUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -410,17 +411,20 @@ public class StandardNiFiWebConfigurationContext implements NiFiWebConfiguration
}
processor = entity.getComponent();
} else {
// claim the revision
serviceFacade.claimRevision(revision, user);
// update processor within write lock
final String writeLockId = serviceFacade.obtainWriteLock();
try {
ProcessorDTO processorDTO = buildProcessorDto(id,annotationData,properties);
final ProcessorEntity entity = serviceFacade.updateProcessor(revision,processorDTO);
processor = entity.getComponent();
processor = serviceFacade.withWriteLock(writeLockId, () -> {
ProcessorDTO processorDTO = buildProcessorDto(id, annotationData, properties);
final ProcessorEntity entity = serviceFacade.updateProcessor(revision, processorDTO);
return entity.getComponent();
});
} finally {
// ensure the revision is canceled.. if the operation succeed, this is a noop
serviceFacade.cancelRevision(revision);
// ensure the lock is released
try {
serviceFacade.releaseWriteLock(writeLockId);
} catch (final LockExpiredException e) {
}
}
}
@ -565,15 +569,19 @@ public class StandardNiFiWebConfigurationContext implements NiFiWebConfiguration
controllerServiceDto.setAnnotationData(annotationData);
controllerServiceDto.setProperties(properties);
// claim the revision
serviceFacade.claimRevision(revision, user);
// update controller service within write lock
final String writeLockId = serviceFacade.obtainWriteLock();
try {
// perform the update
controllerService = serviceFacade.withWriteLock(writeLockId, () -> {
final ControllerServiceEntity entity = serviceFacade.updateControllerService(revision, controllerServiceDto);
controllerService = entity.getComponent();
return entity.getComponent();
});
} finally {
// ensure the revision is canceled.. if the operation succeed, this is a noop
serviceFacade.cancelRevision(revision);
// ensure the lock is released
try {
serviceFacade.releaseWriteLock(writeLockId);
} catch (final LockExpiredException e) {
}
}
} else {
// if this is a standalone instance the service should have been found above... there should
@ -733,14 +741,20 @@ public class StandardNiFiWebConfigurationContext implements NiFiWebConfiguration
reportingTaskDto.setAnnotationData(annotationData);
reportingTaskDto.setProperties(properties);
// claim the revision
serviceFacade.claimRevision(revision, user);
// obtain write lock
final String writeLockId = serviceFacade.obtainWriteLock();
try {
reportingTask = serviceFacade.withWriteLock(writeLockId, () -> {
serviceFacade.verifyRevision(revision, user);
final ReportingTaskEntity entity = serviceFacade.updateReportingTask(revision, reportingTaskDto);
reportingTask = entity.getComponent();
return entity.getComponent();
});
} finally {
// ensure the revision is canceled.. if the operation succeed, this is a noop
serviceFacade.cancelRevision(revision);
// ensure the lock is released
try {
serviceFacade.releaseWriteLock(writeLockId);
} catch (final LockExpiredException e) {
}
}
} else {
// if this is a standalone instance the task should have been found above... there should

View File

@ -16,10 +16,31 @@
*/
package org.apache.nifi.web.api;
import com.sun.jersey.api.core.HttpContext;
import com.sun.jersey.api.representation.Form;
import com.sun.jersey.core.util.MultivaluedMapImpl;
import com.sun.jersey.server.impl.model.method.dispatch.FormDispatchProvider;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.TreeMap;
import java.util.UUID;
import java.util.function.Consumer;
import java.util.function.Supplier;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import javax.ws.rs.core.CacheControl;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.MultivaluedMap;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.ResponseBuilder;
import javax.ws.rs.core.UriBuilder;
import javax.ws.rs.core.UriBuilderException;
import javax.ws.rs.core.UriInfo;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.authorization.Authorizer;
import org.apache.nifi.authorization.RequestAction;
@ -40,32 +61,14 @@ import org.apache.nifi.web.api.dto.RevisionDTO;
import org.apache.nifi.web.api.dto.SnippetDTO;
import org.apache.nifi.web.api.entity.ComponentEntity;
import org.apache.nifi.web.api.request.ClientIdParameter;
import org.apache.nifi.web.concurrent.LockExpiredException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import javax.ws.rs.core.CacheControl;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.MultivaluedMap;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.ResponseBuilder;
import javax.ws.rs.core.UriBuilder;
import javax.ws.rs.core.UriBuilderException;
import javax.ws.rs.core.UriInfo;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.TreeMap;
import java.util.UUID;
import java.util.function.Consumer;
import java.util.function.Supplier;
import com.sun.jersey.api.core.HttpContext;
import com.sun.jersey.api.representation.Form;
import com.sun.jersey.core.util.MultivaluedMapImpl;
import com.sun.jersey.server.impl.model.method.dispatch.FormDispatchProvider;
/**
* Base class for controllers.
@ -343,8 +346,8 @@ public abstract class ApplicationResource {
return isTwoPhaseRequest(httpServletRequest) && httpServletRequest.getHeader(RequestReplicator.REQUEST_VALIDATION_HTTP_HEADER) != null;
}
protected boolean isClaimCancelationPhase(final HttpServletRequest httpServletRequest) {
return httpServletRequest.getHeader(RequestReplicator.CLAIM_CANCEL_HEADER) != null;
protected boolean isLockCancelationPhase(final HttpServletRequest httpServletRequest) {
return httpServletRequest.getHeader(RequestReplicator.LOCK_CANCELATION_HEADER) != null;
}
/**
@ -444,9 +447,7 @@ public abstract class ApplicationResource {
final NiFiUser user = NiFiUserUtils.getNiFiUser();
return withWriteLock(serviceFacade, authorizer, verifier, action,
() -> serviceFacade.claimRevision(revision, user),
() -> serviceFacade.cancelRevision(revision),
() -> serviceFacade.releaseRevisionClaim(revision, user));
() -> serviceFacade.verifyRevision(revision, user));
}
/**
@ -463,9 +464,7 @@ public abstract class ApplicationResource {
final Runnable verifier, final Supplier<Response> action) {
final NiFiUser user = NiFiUserUtils.getNiFiUser();
return withWriteLock(serviceFacade, authorizer, verifier, action,
() -> serviceFacade.claimRevisions(revisions, user),
() -> serviceFacade.cancelRevisions(revisions),
() -> serviceFacade.releaseRevisionClaims(revisions, user));
() -> serviceFacade.verifyRevisions(revisions, user));
}
@ -476,43 +475,65 @@ public abstract class ApplicationResource {
* @param authorizer authorizer
* @param verifier verifier
* @param action the action to execute
* @param claimRevision a callback that will claim the necessary revisions for the operation
* @param cancelRevision a callback that will cancel the necessary revisions if the operation fails
* @param releaseClaim a callback that will release any previously claimed revision if the operation is canceled after the first phase
* @param verifyRevision a callback that will claim the necessary revisions for the operation
* @return the response
*/
private Response withWriteLock(
final NiFiServiceFacade serviceFacade, final AuthorizeAccess authorizer, final Runnable verifier, final Supplier<Response> action,
final Runnable claimRevision, final Runnable cancelRevision, final Runnable releaseClaim) {
final Runnable verifyRevision) {
if (isLockCancelationPhase(httpServletRequest)) {
final String lockVersionId = httpServletRequest.getHeader(RequestReplicator.LOCK_VERSION_ID_HEADER);
try {
serviceFacade.releaseWriteLock(lockVersionId);
} catch (final Exception e) {
// If the lock has expired, then it has already been unlocked.
}
if (isClaimCancelationPhase(httpServletRequest)) {
releaseClaim.run();
return generateOkResponse().build();
}
String lockId = null;
try {
final boolean validationPhase = isValidationPhase(httpServletRequest);
if (validationPhase || !isTwoPhaseRequest(httpServletRequest)) {
// authorize access
serviceFacade.authorizeAccess(authorizer);
claimRevision.run();
lockId = httpServletRequest.getHeader(RequestReplicator.LOCK_VERSION_ID_HEADER);
lockId = serviceFacade.obtainWriteLock(lockId);
verifyRevision.run();
} else {
lockId = httpServletRequest.getHeader(RequestReplicator.LOCK_VERSION_ID_HEADER);
}
try {
if (validationPhase) {
if (verifier != null) {
verifier.run();
}
return generateContinueResponse().build();
}
} catch (final Exception e) {
cancelRevision.run();
throw e;
}
try {
return action.get();
return serviceFacade.withWriteLock(lockId, () -> action.get());
} finally {
cancelRevision.run();
try {
serviceFacade.releaseWriteLock(lockId);
} catch (final LockExpiredException e) {
// If the lock expires here, it's okay. We've already completed our action,
// so the expiration of the lock is of no consequence to us.
}
}
} catch (final RuntimeException t) {
if (lockId != null) {
try {
serviceFacade.releaseWriteLock(lockId);
} catch (final Exception e) {
t.addSuppressed(e);
}
}
throw t;
}
}

View File

@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.web.api.config;
import javax.ws.rs.core.Response;
import javax.ws.rs.ext.ExceptionMapper;
import javax.ws.rs.ext.Provider;
import org.apache.nifi.util.StringUtils;
import org.apache.nifi.web.concurrent.LockExpiredException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@Provider
public class LockExpiredExceptionMapper implements ExceptionMapper<LockExpiredException> {
private static final Logger logger = LoggerFactory.getLogger(InvalidRevisionExceptionMapper.class);
@Override
public Response toResponse(LockExpiredException exception) {
// log the error
logger.warn(String.format("%s. Returning %s response.", exception, Response.Status.CONFLICT));
if (logger.isDebugEnabled()) {
logger.debug(StringUtils.EMPTY, exception);
}
return Response.status(Response.Status.CONFLICT).entity(exception.getMessage()).type("text/plain").build();
}
}

View File

@ -33,7 +33,6 @@
<!-- revision manager -->
<bean id="revisionManager" class="org.apache.nifi.web.revision.NaiveRevisionManager">
<constructor-arg ref="nifiProperties"></constructor-arg>
</bean>
<!-- content access -->
@ -164,6 +163,11 @@
<property name="clusterCoordinator" ref="clusterCoordinator"/>
<property name="heartbeatMonitor" ref="heartbeatMonitor" />
<property name="bulletinRepository" ref="bulletinRepository"/>
<property name="lockManager" ref="lockManager" />
</bean>
<bean id="lockManager" class="org.apache.nifi.web.concurrent.DistributedReadWriteLock">
<constructor-arg ref="nifiProperties" />
</bean>
<!-- component ui extension configuration context -->
@ -393,6 +397,7 @@
<bean class="org.apache.nifi.web.api.config.IllegalNodeReconnectionExceptionMapper" scope="singleton"/>
<bean class="org.apache.nifi.web.api.config.IllegalStateExceptionMapper" scope="singleton"/>
<bean class="org.apache.nifi.web.api.config.InvalidRevisionExceptionMapper" scope="singleton"/>
<bean class="org.apache.nifi.web.api.config.LockExpiredExceptionMapper" scope="singleton"/>
<bean class="org.apache.nifi.web.api.config.JsonMappingExceptionMapper" scope="singleton"/>
<bean class="org.apache.nifi.web.api.config.JsonParseExceptionMapper" scope="singleton"/>
<bean class="org.apache.nifi.web.api.config.MutableRequestExceptionMapper" scope="singleton"/>

View File

@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.web.concurrent;
import java.util.concurrent.TimeUnit;
import org.apache.nifi.util.FormatUtils;
import org.apache.nifi.util.NiFiProperties;
public class DistributedReadWriteLock implements DistributedLockingManager {
private final DistributedLock readLock;
private final DistributedLock writeLock;
public DistributedReadWriteLock(final NiFiProperties properties) {
this(FormatUtils.getTimeDuration(properties.getProperty(NiFiProperties.REQUEST_REPLICATION_CLAIM_TIMEOUT,
NiFiProperties.DEFAULT_REQUEST_REPLICATION_CLAIM_TIMEOUT), TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS);
}
public DistributedReadWriteLock(final long lockExpirationPeriod, final TimeUnit lockExpirationUnit) {
final ReadWriteLockSync sync = new ReadWriteLockSync();
readLock = new ReentrantDistributedLock(LockMode.SHARED, sync, lockExpirationPeriod, lockExpirationUnit);
writeLock = new ReentrantDistributedLock(LockMode.MUTUALLY_EXCLUSIVE, sync, lockExpirationPeriod, lockExpirationUnit);
}
@Override
public DistributedLock getReadLock() {
return readLock;
}
@Override
public DistributedLock getWriteLock() {
return writeLock;
}
}

View File

@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.web.concurrent;
import java.util.concurrent.TimeUnit;
public class LockInfo {
private final String versionId;
private final int lockCount;
private final LockMode lockMode;
private final long expirationTime;
public LockInfo(final String versionId, final LockMode lockMode, final int lockCount, final long expirationPeriod, final TimeUnit expirationUnit) {
this.versionId = versionId;
this.lockMode = lockMode;
this.lockCount = lockCount;
this.expirationTime = System.nanoTime() + expirationUnit.toNanos(expirationPeriod);
}
public boolean isExpired() {
return System.nanoTime() > expirationTime;
}
public String getVersionId() {
return versionId;
}
public int getLockCount() {
return lockCount;
}
public LockMode getLockMode() {
return lockMode;
}
@Override
public String toString() {
return "LockInfo[versionId=" + versionId + ", lockMode=" + lockMode + ", lockCount = " + lockCount + ", expired=" + isExpired() + "]";
}
}

View File

@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.web.concurrent;
public enum LockMode {
SHARED,
MUTUALLY_EXCLUSIVE;
}

View File

@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.web.concurrent;
import java.util.concurrent.atomic.AtomicReference;
public class ReadWriteLockSync {
private final AtomicReference<LockInfo> lockInfoRef = new AtomicReference<>();
public LockInfo get() {
return lockInfoRef.get();
}
public boolean update(final LockInfo currentLock, final LockInfo updatedLock) {
return lockInfoRef.compareAndSet(currentLock, updatedLock);
}
}

View File

@ -0,0 +1,174 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.web.concurrent;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ReentrantDistributedLock implements DistributedLock {
private static final Logger logger = LoggerFactory.getLogger(ReentrantDistributedLock.class);
private final long expirationNanos;
private final ReadWriteLockSync sync;
private final LockMode lockMode;
public ReentrantDistributedLock(final LockMode lockMode, final ReadWriteLockSync sync, final long expirationTimePeriod, final TimeUnit expirationTimeUnit) {
this.lockMode = lockMode;
this.sync = sync;
this.expirationNanos = expirationTimeUnit.toNanos(expirationTimePeriod);
}
int getClaimCount() {
final LockInfo currentInfo = sync.get();
if (currentInfo == null || currentInfo.isExpired()) {
return 0;
}
return currentInfo.getLockCount();
}
@Override
public String lock() {
return lock(null);
}
@Override
public String lock(final String versionIdentifier) {
return tryLock(-1L, TimeUnit.MILLISECONDS, versionIdentifier);
}
@Override
public String tryLock(final long time, final TimeUnit timeUnit) {
return tryLock(time, timeUnit, null);
}
@Override
public String tryLock(final long timePeriod, final TimeUnit timeUnit, final String versionIdentifier) {
final long stopTryingTime = timePeriod < 0 ? -1L : System.nanoTime() + timeUnit.toNanos(timePeriod);
logger.debug("Attempting to obtain {} lock with a max wait of {} {}", lockMode, timePeriod, timeUnit);
long i = 0;
while (true) {
if (i++ > 0) {
if (stopTryingTime > 0L && System.nanoTime() > stopTryingTime) {
logger.debug("Failed to obtain {} lock within {} {}; returning null for tryLock", lockMode, timePeriod, timeUnit);
return null;
}
// If not the first time we've reached this point, we want to
// give other threads a chance to release their locks before
// we enter the synchronized block.
Thread.yield();
}
synchronized (sync) {
final LockInfo currentInfo = sync.get();
logger.trace("Current Lock Info = {}", currentInfo);
if (currentInfo == null || currentInfo.isExpired()) {
// There is no lock currently held. Attempt to obtain the lock.
final String versionId = versionIdentifier == null ? UUID.randomUUID().toString() : versionIdentifier;
final boolean updated = updateLockInfo(currentInfo, versionId, 1);
if (updated) {
// Lock has been obtained. Return the current version.
logger.debug("Obtained {} lock with Version ID {}", lockMode, versionId);
return versionId;
} else {
// Try again.
logger.debug("Failed to update atomic reference. Trying again");
continue;
}
} else {
// There is already a lock held. If the lock that is being held is SHARED,
// and this is a SHARED lock, then we can use it.
if (lockMode == LockMode.SHARED && currentInfo.getLockMode() == LockMode.SHARED) {
logger.debug("Lock is already held but is a shared lock. Attempting to increment lock count");
// lock being held is a shared lock, and this is a shared lock. We can just
// update the Lock Info by incrementing the lock count and using a new expiration time.
final boolean updated = updateLockInfo(currentInfo, currentInfo.getVersionId(), currentInfo.getLockCount() + 1);
if (updated) {
// lock info was updated. Return the current version.
logger.debug("Incremented lock count. Obtained {} lock with Version ID {}", lockMode, currentInfo.getVersionId());
return currentInfo.getVersionId();
} else {
// failed to update the lock info. The lock has expired, so we have to start over.
logger.debug("Failed to update atomic reference. Trying again");
continue;
}
} else {
// either the lock being held is a mutex or this lock requires a mutex. Either
// way, we cannot enter the lock, so we will wait a bit and then retry.
// We wait before entering synchronized block, because we don't want to overuse
// the CPU and we want to give other threads a chance to unlock the lock.
logger.debug("Cannot obtain {} lock because it is already held and cannot be shared. Trying again", lockMode);
continue;
}
}
}
}
}
protected boolean updateLockInfo(final LockInfo currentInfo, final String versionId, final int lockCount) {
final LockInfo newInfo = new LockInfo(versionId, lockMode, lockCount, expirationNanos, TimeUnit.NANOSECONDS);
return sync.update(currentInfo, newInfo);
}
@Override
public <T> T withLock(final String identifier, final Supplier<T> action) throws LockExpiredException {
synchronized (sync) {
verifyIdentifier(identifier, sync.get());
return action.get();
}
}
@Override
public void unlock(final String identifier) throws LockExpiredException {
synchronized (sync) {
final LockInfo info = sync.get();
verifyIdentifier(identifier, info);
final int newLockCount = info.getLockCount() - 1;
if (newLockCount <= 0) {
sync.update(info, null);
} else {
sync.update(info, new LockInfo(info.getVersionId(), lockMode, newLockCount, expirationNanos, TimeUnit.NANOSECONDS));
}
}
}
private void verifyIdentifier(final String identifier, final LockInfo lockInfo) throws LockExpiredException {
if (lockInfo == null) {
throw new LockExpiredException("No lock has been obtained");
}
if (!lockInfo.getVersionId().equals(identifier)) {
throw new LockExpiredException("Incorrect Lock ID provided. This typically means that the lock has already expired and another lock has been obtained.");
}
if (lockInfo.isExpired()) {
throw new LockExpiredException("Lock has already expired");
}
}
}

View File

@ -17,30 +17,16 @@
package org.apache.nifi.web.revision;
import java.text.Collator;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.Stack;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.nifi.authorization.user.NiFiUser;
import org.apache.nifi.util.FormatUtils;
import org.apache.nifi.util.NiFiProperties;
import org.apache.nifi.web.FlowModification;
import org.apache.nifi.web.InvalidRevisionException;
import org.apache.nifi.web.Revision;
import org.slf4j.Logger;
@ -58,197 +44,59 @@ import org.slf4j.LoggerFactory;
public class NaiveRevisionManager implements RevisionManager {
private static final Logger logger = LoggerFactory.getLogger(NaiveRevisionManager.class);
private final long claimExpirationNanos;
private final ConcurrentMap<String, RevisionLock> revisionLockMap = new ConcurrentHashMap<>();
private final ConcurrentMap<String, Revision> revisionMap = new ConcurrentHashMap<>();
public NaiveRevisionManager() {
this(1, TimeUnit.MINUTES);
}
public NaiveRevisionManager(final NiFiProperties properties) {
this(getRequestTimeoutMillis(properties), TimeUnit.MILLISECONDS);
}
/**
* Constructs a new NaiveRevisionManager that uses the given amount of time as the expiration time
* for a Revision Claims
*
* @param claimExpiration how long a Revision Claim should last
* @param timeUnit the TimeUnit of 'claimExpiration'
*/
public NaiveRevisionManager(final long claimExpiration, final TimeUnit timeUnit) {
this.claimExpirationNanos = timeUnit.toNanos(claimExpiration);
}
private static long getRequestTimeoutMillis(final NiFiProperties properties) {
return FormatUtils.getTimeDuration(properties.getProperty(NiFiProperties.REQUEST_REPLICATION_CLAIM_TIMEOUT,
NiFiProperties.DEFAULT_REQUEST_REPLICATION_CLAIM_TIMEOUT), TimeUnit.MILLISECONDS);
}
@Override
public RevisionClaim requestClaim(final Revision revision, final NiFiUser user) throws InvalidRevisionException {
Objects.requireNonNull(user);
return requestClaim(Collections.singleton(revision), user);
}
@Override
public void reset(final Collection<Revision> revisions) {
final Map<String, RevisionLock> copy;
synchronized (this) {
copy = new HashMap<>(revisionLockMap);
revisionLockMap.clear();
synchronized (this) { // avoid allowing two threads to reset versions concurrently
revisionMap.clear();
for (final Revision revision : revisions) {
revisionLockMap.put(revision.getComponentId(), new RevisionLock(new FlowModification(revision, null), claimExpirationNanos));
revisionMap.put(revision.getComponentId(), revision);
}
}
for (final RevisionLock lock : copy.values()) {
lock.clear();
}
}
@Override
public List<Revision> getAllRevisions() {
return revisionLockMap.values().stream()
.map(lock -> lock.getRevision())
.collect(Collectors.toList());
}
@Override
public RevisionClaim requestClaim(final Collection<Revision> revisions, final NiFiUser user) {
Objects.requireNonNull(user);
logger.debug("Attempting to claim Revisions {}", revisions);
// Try to obtain a Revision Claim (temporary lock) on all revisions
final List<Revision> revisionList = new ArrayList<>(revisions);
revisionList.sort(new RevisionComparator());
ClaimResult failedClaimResult = null;
final Set<RevisionLock> locksObtained = new HashSet<>();
for (int i = 0; i < revisionList.size(); i++) {
final Revision revision = revisionList.get(i);
final RevisionLock revisionLock = getRevisionLock(revision);
final ClaimResult claimResult = revisionLock.requestClaim(revision, user);
logger.trace("Obtained Revision Claim for {}", revision);
if (claimResult.isSuccessful()) {
locksObtained.add(revisionLock);
} else {
logger.debug("Failed to obtain Revision Claim for component with ID {} because Current Revision is {} but supplied Revision is {}",
revision.getComponentId(), claimResult.getLastModification().getRevision(), revision);
failedClaimResult = claimResult;
break;
}
}
// if we got a Revision Claim on each Revision, return a successful result
if (locksObtained.size() == revisionList.size()) {
logger.trace("Obtained Revision Claim for all components");
// it's possible that obtaining the locks took a while if we are obtaining
// many. Renew the timestamp to ensure that the first locks obtained don't
// expire too quickly.
final long timestamp = System.nanoTime() + claimExpirationNanos;
for (final RevisionLock revisionLock : locksObtained) {
revisionLock.renewExpiration(timestamp);
}
return new StandardRevisionClaim(revisions);
}
// We failed to obtain all of the Revision Claims necessary. Since
// we need this call to atomically obtain all or nothing, we have to now
// release the locks that we did obtain.
logger.debug("Failed to obtain all necessary Revisions; releasing claims for {}", locksObtained);
for (final RevisionLock revisionLock : locksObtained) {
revisionLock.releaseClaim();
}
final FlowModification lastMod = failedClaimResult.getLastModification();
if (lastMod.getRevision().getClientId() == null || lastMod.getRevision().getClientId().trim().isEmpty() || lastMod.getRevision().getVersion() == null) {
throw new InvalidRevisionException(String.format("Given revision %s does not match current revision %s.",
failedClaimResult.getProposedRevision(), lastMod.getRevision()));
} else {
throw new InvalidRevisionException(String.format("Component %s has been updated by '%s'. Please refresh to synchronize the view.",
failedClaimResult.getProposedRevision().getComponentId(), lastMod.getLastModifier()));
}
return new ArrayList<>(revisionMap.values());
}
@Override
public Revision getRevision(final String componentId) {
final RevisionLock revisionLock = getRevisionLock(new Revision(0L, null, componentId));
return revisionLock.getRevision();
return revisionMap.computeIfAbsent(componentId, id -> new Revision(0L, null, componentId));
}
@Override
public <T> T deleteRevision(final RevisionClaim claim, final NiFiUser user, final DeleteRevisionTask<T> task) throws ExpiredRevisionClaimException {
Objects.requireNonNull(user);
logger.debug("Attempting to delete revision using {}", claim);
int successCount = 0;
final List<Revision> revisionList = new ArrayList<>(claim.getRevisions());
revisionList.sort(new RevisionComparator());
// Verify the provided revisions.
String failedId = null;
for (final Revision revision : revisionList) {
final RevisionLock revisionLock = getRevisionLock(revision);
final boolean verified = revisionLock.requestWriteLock(revision, user);
if (verified) {
logger.trace("Verified Revision Claim for {}", revision);
successCount++;
} else {
logger.debug("Failed to verify Revision Claim for {}", revision);
failedId = revision.getComponentId();
break;
}
}
if (successCount == revisionList.size()) {
logger.debug("Successfully verified Revision Claim for all revisions {}", claim);
final T taskValue;
try {
taskValue = task.performTask();
} catch (final Exception e) {
logger.debug("Failed to perform Claim Deletion task. Will relinquish the Revision Claims for the following revisions: {}", revisionList);
for (final Revision revision : revisionList) {
final RevisionLock revisionLock = getRevisionLock(revision);
revisionLock.unlock(revision, revision, user.getIdentity());
logger.debug("Relinquished lock for {}", revision);
}
throw e;
}
for (final Revision revision : revisionList) {
deleteRevisionLock(revision);
logger.debug("Deleted Revision {}", revision);
}
return taskValue;
}
// We failed to obtain a thread lock for all revisions. Relinquish
// any Revision Claims that we have
for (int i = 0; i < successCount; i++) {
final Revision revision = revisionList.get(i);
final RevisionLock revisionLock = getRevisionLock(revision);
revisionLock.relinquishRevisionClaim(revision, null);
logger.debug("Relinquished lock for {}", revision);
}
// Throw an Exception indicating that we failed to obtain the locks
final Revision curRevision = getRevision(revision.getComponentId());
if (!curRevision.equals(revision)) {
throw new ExpiredRevisionClaimException("Invalid Revision was given for component with ID '" + failedId + "'");
}
}
// Perform the action provided
final T taskResult = task.performTask();
for (final Revision revision : revisionList) {
revisionMap.remove(revision.getComponentId());
}
return taskResult;
}
@Override
public <T> RevisionUpdate<T> updateRevision(final RevisionClaim originalClaim, final NiFiUser user, final UpdateRevisionTask<T> task) throws ExpiredRevisionClaimException {
Objects.requireNonNull(user);
int successCount = 0;
logger.debug("Attempting to update revision using {}", originalClaim);
final List<Revision> revisionList = new ArrayList<>(originalClaim.getRevisions());
@ -256,21 +104,16 @@ public class NaiveRevisionManager implements RevisionManager {
String failedId = null;
for (final Revision revision : revisionList) {
final RevisionLock revisionLock = getRevisionLock(revision);
final boolean verified = revisionLock.requestWriteLock(revision, user);
final Revision currentRevision = getRevision(revision.getComponentId());
final boolean verified = revision.equals(currentRevision);
if (verified) {
logger.trace("Verified Revision Claim for {}", revision);
successCount++;
} else {
logger.debug("Failed to verify Revision Claim for {}", revision);
failedId = revision.getComponentId();
break;
if (!verified) {
// Throw an Exception indicating that we failed to obtain the locks
throw new InvalidRevisionException("Invalid Revision was given for component with ID '" + failedId + "'");
}
}
// We successfully verified all revisions.
if (successCount == revisionList.size()) {
logger.debug("Successfully verified Revision Claim for all revisions");
RevisionUpdate<T> updatedComponent = null;
@ -300,7 +143,7 @@ public class NaiveRevisionManager implements RevisionManager {
for (final Revision revision : revisionList) {
final Revision updatedRevision = updatedRevisions.get(revision);
getRevisionLock(revision).unlock(revision, updatedRevision, user.getIdentity());
revisionMap.put(updatedRevision.getComponentId(), updatedRevision);
if (updatedRevision.getVersion() != revision.getVersion()) {
logger.debug("Unlocked Revision {} and updated associated Version to {}", revision, updatedRevision.getVersion());
@ -312,467 +155,4 @@ public class NaiveRevisionManager implements RevisionManager {
return updatedComponent;
}
// We failed to obtain a thread lock for all revisions. Relinquish
// any Revision Claims that we have
for (int i = 0; i < successCount; i++) {
final Revision revision = revisionList.get(i);
final RevisionLock revisionLock = getRevisionLock(revision);
revisionLock.cancelWriteLock();
logger.debug("Relinquished lock for {}", revision);
}
// Throw an Exception indicating that we failed to obtain the locks
throw new InvalidRevisionException("Invalid Revision was given for component with ID '" + failedId + "'");
}
@Override
public boolean releaseClaim(final RevisionClaim claim, final NiFiUser user) {
Objects.requireNonNull(user);
boolean success = true;
final List<Revision> revisions = new ArrayList<>(claim.getRevisions());
revisions.sort(new RevisionComparator());
for (final Revision revision : revisions) {
final RevisionLock revisionLock = getRevisionLock(revision);
success = revisionLock.relinquishRevisionClaim(revision, user) && success;
}
return success;
}
@Override
public boolean cancelClaim(String componentId) {
logger.debug("Attempting to cancel claim for component {}", componentId);
final Revision revision = new Revision(0L, null, componentId);
final RevisionLock revisionLock = getRevisionLock(revision);
if (revisionLock == null) {
logger.debug("No Revision Lock exists for Component {} - there is no claim to cancel", componentId);
return false;
}
return revisionLock.releaseClaimIfCurrentThread(null);
}
@Override
public boolean cancelClaim(Revision revision) {
logger.debug("Attempting to cancel claim for {}", revision);
final RevisionLock revisionLock = getRevisionLock(revision);
if (revisionLock == null) {
logger.debug("No Revision Lock exists for {} - there is no claim to cancel", revision);
return false;
}
return revisionLock.releaseClaimIfCurrentThread(revision);
}
@Override
public boolean cancelClaims(final Set<Revision> revisions) {
boolean successful = false;
for (final Revision revision : revisions) {
successful = cancelClaim(revision);
}
return successful;
}
@Override
public <T> T get(final String componentId, final ReadOnlyRevisionCallback<T> callback) {
final RevisionLock revisionLock = getRevisionLock(new Revision(0L, null, componentId));
logger.debug("Attempting to obtain read lock for {}", revisionLock.getRevision());
if (logger.isTraceEnabled()) {
logger.trace("Attempting to obtain read lock due to following stack trace", new RuntimeException("Exception for generating stack trace for debugging purposes"));
}
revisionLock.acquireReadLock(null, revisionLock.getRevision().getClientId());
logger.debug("Obtained read lock for {}", revisionLock.getRevision());
try {
return callback.withRevision(revisionLock.getRevision());
} finally {
logger.debug("Releasing read lock for {}", revisionLock.getRevision());
revisionLock.relinquishReadLock();
}
}
@Override
public <T> T get(final Set<String> componentIds, final Supplier<T> callback) {
final List<String> sortedIds = new ArrayList<>(componentIds);
sortedIds.sort(Collator.getInstance());
final Stack<RevisionLock> revisionLocks = new Stack<>();
logger.debug("Will attempt to obtain read locks for components {}", componentIds);
if (logger.isTraceEnabled()) {
logger.trace("Attempting to obtain read lock due to following stack trace", new RuntimeException("Exception for generating stack trace for debugging purposes"));
}
for (final String componentId : sortedIds) {
final RevisionLock revisionLock = getRevisionLock(new Revision(0L, null, componentId));
logger.trace("Attempting to obtain read lock for {}", revisionLock.getRevision());
revisionLock.acquireReadLock(null, revisionLock.getRevision().getClientId());
revisionLocks.push(revisionLock);
logger.trace("Obtained read lock for {}", revisionLock.getRevision());
}
logger.debug("Obtained read lock for all necessary components {}; calling call-back", componentIds);
try {
return callback.get();
} finally {
while (!revisionLocks.isEmpty()) {
final RevisionLock lock = revisionLocks.pop();
logger.debug("Releasing read lock for {}", lock.getRevision());
lock.relinquishReadLock();
}
}
}
private synchronized void deleteRevisionLock(final Revision revision) {
final RevisionLock revisionLock = revisionLockMap.remove(revision.getComponentId());
if (revisionLock == null) {
return;
}
revisionLock.releaseClaim();
}
private synchronized RevisionLock getRevisionLock(final Revision revision) {
return revisionLockMap.computeIfAbsent(revision.getComponentId(), id -> new RevisionLock(new FlowModification(revision, null), claimExpirationNanos));
}
private static class RevisionLock {
private final AtomicReference<FlowModification> lastModReference = new AtomicReference<>();
private final AtomicReference<LockStamp> lockStamp = new AtomicReference<>();
private final long lockNanos;
private final ReadWriteLock threadLock = new ReentrantReadWriteLock();
public RevisionLock(final FlowModification lastMod, final long lockNanos) {
this.lockNanos = lockNanos;
lastModReference.set(lastMod);
}
/**
* Requests that a Revision Claim be granted for the proposed Revision
*
* @param proposedRevision the revision to obtain a Claim for
*
* @return <code>true</code> if the Revision is valid and a Claim has been granted, <code>false</code> otherwise
*/
public ClaimResult requestClaim(final Revision proposedRevision, final NiFiUser user) {
// acquire the claim, blocking if necessary.
acquireClaim(user, proposedRevision.getClientId());
threadLock.writeLock().lock();
try {
// check if the revision is correct
final FlowModification lastModification = lastModReference.get();
final Revision currentRevision = lastModification.getRevision();
if (proposedRevision.equals(currentRevision)) {
// revision is correct - return true
return new ClaimResult(true, lastModification, proposedRevision);
}
// revision is incorrect. Release the Claim and return false
releaseClaim();
logger.debug("Cannot obtain Revision Claim {} because the Revision is out-of-date. Current revision is {}", proposedRevision, currentRevision);
return new ClaimResult(false, lastModification, proposedRevision);
} finally {
threadLock.writeLock().unlock();
}
}
/**
* Verifies that the given Revision has a Claim against it already and that the Claim belongs
* to the same client as the given Revision. If so, upgrades the Revision Claim to a lock that
* will not be relinquished until the {@link #unlock(Revision)} method is called.
*
* @param proposedRevision the current Revision
* @return <code>true</code> if the Revision Claim was upgraded to a lock, <code>false</code> otherwise
* @throws ExpiredRevisionClaimException if the Revision Claim for the given Revision has already expired
*/
public boolean requestWriteLock(final Revision proposedRevision, final NiFiUser user) throws ExpiredRevisionClaimException {
Objects.requireNonNull(proposedRevision);
threadLock.writeLock().lock();
boolean releaseLock = true;
try {
if (getRevision().equals(proposedRevision)) {
final LockStamp stamp = lockStamp.get();
if (stamp == null) {
final IllegalStateException ise = new IllegalStateException("No claim has been obtained for " + proposedRevision + " so cannot lock the component for modification");
logger.debug("Attempted to obtain write lock for {} but no Claim was obtained; throwing IllegalStateException", proposedRevision, ise);
throw ise;
}
final boolean userEqual = stamp.getUser() == null || stamp.getUser().equals(user);
if (!userEqual) {
logger.debug("Failed to verify {} because the User was not the same as the Lock Stamp's User (Lock Stamp was {})", proposedRevision, stamp);
throw new InvalidRevisionException("Cannot obtain write lock for " + proposedRevision + " because it was claimed by " + stamp.getUser());
}
final boolean clientIdEqual = stamp.getClientId() == null || stamp.getClientId().equals(proposedRevision.getClientId());
if (!clientIdEqual) {
logger.debug("Failed to verify {} because the Client ID was not the same as the Lock Stamp's Client ID (Lock Stamp was {})", proposedRevision, stamp);
throw new InvalidRevisionException("Cannot obtain write lock for " + proposedRevision + " because it was claimed with a different Client ID");
}
// TODO - Must make sure that we don't have an expired stamp if it is the result of another
// operation taking a long time. I.e., Client A fires off two requests for Component X. If the
// first one takes 2 minutes to complete, it should not result in the second request getting
// rejected. I.e., we want to ensure that if the request is received before the Claim expired,
// that we do not throw an ExpiredRevisionClaimException. Expiration of the Revision is intended
// only to avoid the case where a node obtains a Claim and then the node is lost or otherwise does
// not fulfill the second phase of the two-phase commit.
// We may need a Queue of updates (queue would need to be bounded, with a request getting
// rejected if queue is full).
if (stamp.isExpired()) {
throw new ExpiredRevisionClaimException("Claim for " + proposedRevision + " has expired");
}
// Intentionally leave the thread lock in a locked state!
releaseLock = false;
return true;
}
} finally {
if (releaseLock) {
threadLock.writeLock().unlock();
}
}
return false;
}
private void acquireClaim(final NiFiUser user, final String clientId) {
while (true) {
final LockStamp stamp = lockStamp.get();
if (stamp == null || stamp.isExpired()) {
final long now = System.nanoTime();
final boolean lockObtained = lockStamp.compareAndSet(stamp, new LockStamp(user, clientId, now + lockNanos));
if (lockObtained) {
return;
}
} else {
Thread.yield();
}
}
}
public void acquireReadLock(final NiFiUser user, final String clientId) {
// Wait until we can claim the lock stamp
boolean obtained = false;
while (!obtained) {
// If the lock stamp is not null, then there is either an active Claim or a
// write lock held. Wait until it is null and then replace it atomically
// with a LockStamp that does not expire (expiration time is Long.MAX_VALUE).
final LockStamp curStamp = lockStamp.get();
final boolean nullOrExpired = (curStamp == null || curStamp.isExpired());
obtained = nullOrExpired && lockStamp.compareAndSet(curStamp, new LockStamp(user, clientId, Long.MAX_VALUE));
if (!obtained) {
// Could not obtain lock. Yield so that we don't sit around doing nothing with the thread.
Thread.yield();
}
}
// Now we can obtain the read lock without problem.
threadLock.readLock().lock();
}
public void relinquishReadLock() {
lockStamp.set(null);
threadLock.readLock().unlock();
}
private void releaseClaim() {
lockStamp.set(null);
}
public void clear() {
threadLock.writeLock().lock();
try {
releaseClaim();
} finally {
threadLock.writeLock().unlock();
}
}
public boolean releaseClaimIfCurrentThread(final Revision revision) {
threadLock.writeLock().lock();
try {
final LockStamp stamp = lockStamp.get();
if (stamp == null) {
logger.debug("Cannot cancel claim for {} because there is no claim held", getRevision());
return false;
}
if (revision != null && !getRevision().equals(revision)) {
throw new InvalidRevisionException("Cannot release claim because the provided Revision is not valid");
}
if (stamp.isObtainedByCurrentThread()) {
releaseClaim();
logger.debug("Successfully canceled claim for {}", getRevision());
return true;
}
logger.debug("Cannot cancel claim for {} because it is held by Thread {} and current Thread is {}",
getRevision(), stamp.obtainingThread, Thread.currentThread().getName());
return false;
} finally {
threadLock.writeLock().unlock();
}
}
/**
* Releases the Revision Claim if and only if the current revision matches the proposed revision
*
* @param proposedRevision the proposed revision to check against the current revision
* @return <code>true</code> if the Revision Claim was relinquished, <code>false</code> otherwise
*/
public boolean relinquishRevisionClaim(final Revision proposedRevision, final NiFiUser user) {
threadLock.writeLock().lock();
try {
final LockStamp stamp = lockStamp.get();
final boolean userOk = stamp == null || stamp.getUser().equals(user);
if (userOk) {
if (getRevision().equals(proposedRevision)) {
releaseClaim();
return true;
}
} else {
throw new InvalidRevisionException("Cannot relinquish claim for " + proposedRevision + " because it was claimed by " + stamp.getUser());
}
return false;
} finally {
threadLock.writeLock().unlock();
}
}
/**
* Releases the lock and any Revision Claim that is held for the given Revision and
* updates the revision
*
* @param proposedRevision the current Revision
* @param updatedRevision the Revision to update the current revision to
*/
public void unlock(final Revision proposedRevision, final Revision updatedRevision, final String modifier) {
final Revision curRevision = getRevision();
if (curRevision == null) {
throw new IllegalMonitorStateException("Cannot unlock " + proposedRevision + " because it is not locked");
}
if (!curRevision.equals(proposedRevision)) {
// Intentionally leave the thread lock in a locked state!
throw new IllegalMonitorStateException("Cannot unlock " + proposedRevision + " because the version is not valid");
}
lastModReference.set(new FlowModification(updatedRevision, modifier));
// Set stamp to null to indicate that it is not locked.
releaseClaim();
// Thread Lock should already be locked if this is called.
threadLock.writeLock().unlock();
}
public void cancelWriteLock() {
releaseClaim();
threadLock.writeLock().unlock();
}
/**
* Updates expiration time to the given timestamp
*
* @param timestamp the new expiration timestamp in nanoseconds
*/
public void renewExpiration(final long timestamp) {
final LockStamp stamp = lockStamp.get();
final NiFiUser user;
final String clientId;
if (stamp == null) {
user = null;
clientId = null;
} else {
user = stamp.getUser();
clientId = stamp.getClientId();
}
lockStamp.set(new LockStamp(user, clientId, timestamp));
}
public Revision getRevision() {
final FlowModification lastMod = lastModReference.get();
return (lastMod == null) ? null : lastMod.getRevision();
}
}
private static class LockStamp {
private final NiFiUser user;
private final String clientId;
private final long expirationTimestamp;
private final Thread obtainingThread;
public LockStamp(final NiFiUser user, final String clientId, final long expirationTimestamp) {
this.user = user;
this.clientId = clientId;
this.expirationTimestamp = expirationTimestamp;
this.obtainingThread = Thread.currentThread();
}
public NiFiUser getUser() {
return user;
}
public String getClientId() {
return clientId;
}
public boolean isExpired() {
return System.nanoTime() > expirationTimestamp;
}
public boolean isObtainedByCurrentThread() {
return obtainingThread == Thread.currentThread();
}
@Override
public String toString() {
return "LockStamp[user=" + user + ", clientId=" + clientId + ", expired=" + isExpired() + "]";
}
}
private static class ClaimResult {
private final boolean successful;
private final FlowModification lastMod;
private final Revision proposedRevision;
public ClaimResult(final boolean successful, final FlowModification lastMod, final Revision proposedRevision) {
this.successful = successful;
this.lastMod = lastMod;
this.proposedRevision = proposedRevision;
}
public boolean isSuccessful() {
return successful;
}
public FlowModification getLastModification() {
return lastMod;
}
public Revision getProposedRevision() {
return proposedRevision;
}
}
}

View File

@ -0,0 +1,216 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.web.concurrent;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNotSame;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReferenceArray;
import org.junit.Before;
import org.junit.Test;
public class TestReentrantDistributedLock {
private ReadWriteLockSync sync;
@Before
public void setup() {
sync = new ReadWriteLockSync();
}
@Test(timeout = 5000)
public void testMultipleReadLocks() throws LockExpiredException {
final ReentrantDistributedLock lock = createReadLock();
final String id1 = lock.lock();
final String id2 = lock.lock();
assertEquals(id1, id2);
assertEquals(2, lock.getClaimCount());
lock.unlock(id1);
assertEquals(1, lock.getClaimCount());
lock.unlock(id2);
assertEquals(0, lock.getClaimCount());
}
@Test(timeout = 10000)
public void testMultipleWriteLocksBlock() throws LockExpiredException {
final ReentrantDistributedLock lock = createWriteLock();
final String id1 = lock.lock();
assertNotNull(id1);
final long startTime = System.nanoTime();
final String id2 = lock.tryLock(500, TimeUnit.MILLISECONDS);
assertNull(id2);
// We don't know exactly how long it will take to timeout because the time periods
// won't be exact, but it should take more than 350 milliseconds.
final long nanos = System.nanoTime() - startTime;
assertTrue(nanos > TimeUnit.MILLISECONDS.toNanos(350L));
lock.unlock(id1);
final String id3 = lock.tryLock(500, TimeUnit.MILLISECONDS);
assertNotNull(id3);
assertNotSame(id1, id3);
lock.unlock(id3);
}
@Test(timeout = 10000)
public void testReadLockBlocksWriteLock() throws LockExpiredException {
final ReentrantDistributedLock readLock = createReadLock();
final ReentrantDistributedLock writeLock = createWriteLock();
final String id1 = readLock.lock();
assertNotNull(id1);
final long startTime = System.nanoTime();
final String id2 = writeLock.tryLock(500, TimeUnit.MILLISECONDS);
assertNull(id2);
// We don't know exactly how long it will take to timeout because the time periods
// won't be exact, but it should take more than 350 milliseconds.
final long nanos = System.nanoTime() - startTime;
assertTrue(nanos > TimeUnit.MILLISECONDS.toNanos(350L));
readLock.unlock(id1);
final String id3 = writeLock.lock();
assertNotNull(id3);
assertNotSame(id1, id3);
writeLock.unlock(id3);
}
@Test(timeout = 10000)
public void testWriteLockBlocksReadLock() throws LockExpiredException {
final ReentrantDistributedLock readLock = createReadLock();
final ReentrantDistributedLock writeLock = createWriteLock();
final String id1 = writeLock.lock();
assertNotNull(id1);
final long startTime = System.nanoTime();
final String id2 = readLock.tryLock(500, TimeUnit.MILLISECONDS);
assertNull(id2);
// We don't know exactly how long it will take to timeout because the time periods
// won't be exact, but it should take more than 350 milliseconds.
final long nanos = System.nanoTime() - startTime;
assertTrue(nanos > TimeUnit.MILLISECONDS.toNanos(350L));
writeLock.unlock(id1);
final String id3 = readLock.lock();
assertNotNull(id3);
assertNotSame(id1, id3);
readLock.unlock(id3);
}
@Test(timeout = 10000)
public void testMultipleReadLocksBlockingOnWriteLock() throws InterruptedException, LockExpiredException {
final ReentrantDistributedLock readLock = createReadLock();
final ReentrantDistributedLock writeLock = createWriteLock();
final String id1 = writeLock.lock();
assertNotNull(id1);
final ExecutorService executor = Executors.newFixedThreadPool(3);
final AtomicReferenceArray<String> array = new AtomicReferenceArray<>(3);
for (int i = 0; i < 3; i++) {
final int index = i;
executor.submit(new Runnable() {
@Override
public void run() {
final String id = readLock.lock();
assertNotNull(id);
array.set(index, id);
}
});
}
// wait a bit and then make sure that no values have been set
Thread.sleep(250L);
for (int i = 0; i < 3; i++) {
assertNull(array.get(i));
}
// unlock so that the readers can lock.
writeLock.unlock(id1);
executor.shutdown();
executor.awaitTermination(10, TimeUnit.MINUTES);
final String id = array.get(0);
assertNotNull(id);
for (int i = 0; i < 3; i++) {
assertEquals(id, array.get(i));
}
for (int i = 0; i < 3; i++) {
assertEquals(3 - i, readLock.getClaimCount());
readLock.unlock(id);
}
assertEquals(0, readLock.getClaimCount());
}
@Test(timeout = 10000)
public void testLockExpires() {
final ReentrantDistributedLock lock = new ReentrantDistributedLock(LockMode.MUTUALLY_EXCLUSIVE, sync, 25, TimeUnit.MILLISECONDS);
final String id1 = lock.lock();
assertNotNull(id1);
final long start = System.nanoTime();
final String id2 = lock.lock();
final long nanos = System.nanoTime() - start;
assertNotNull(id2);
assertNotSame(id1, id2);
// The timeout may not entirely elapse but will be close. Give 5 milliseconds buffer
assertTrue(nanos > TimeUnit.MILLISECONDS.toNanos(20));
}
@Test(timeout = 10000)
public void testWithLock() throws LockExpiredException, Exception {
final ReentrantDistributedLock lock = createWriteLock();
final String id = lock.lock();
assertEquals(1, lock.getClaimCount());
final Object obj = new Object();
final Object returned = lock.withLock(id, () -> obj);
assertTrue(returned == obj);
assertEquals(1, lock.getClaimCount());
lock.unlock(id);
assertEquals(0, lock.getClaimCount());
}
private ReentrantDistributedLock createReadLock() {
return new ReentrantDistributedLock(LockMode.SHARED, sync, 30, TimeUnit.SECONDS);
}
private ReentrantDistributedLock createWriteLock() {
return new ReentrantDistributedLock(LockMode.MUTUALLY_EXCLUSIVE, sync, 30, TimeUnit.SECONDS);
}
}

View File

@ -18,33 +18,13 @@
package org.apache.nifi.web.revision;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.nifi.authorization.user.NiFiUser;
import org.apache.nifi.authorization.user.StandardNiFiUser;
import org.apache.nifi.web.FlowModification;
import org.apache.nifi.web.InvalidRevisionException;
import org.apache.nifi.web.Revision;
import org.junit.Assert;
import org.junit.Test;
public class TestNaiveRevisionManager {
@ -72,644 +52,4 @@ public class TestNaiveRevisionManager {
return new StandardRevisionUpdate<Object>(null, new FlowModification(revision, null), additionalRevisions);
}
@Test
public void testTypicalFlow() throws ExpiredRevisionClaimException {
final RevisionManager revisionManager = new NaiveRevisionManager();
final Revision originalRevision = new Revision(0L, CLIENT_1, COMPONENT_1);
final RevisionClaim claim = revisionManager.requestClaim(originalRevision, USER_1);
assertNotNull(claim);
revisionManager.updateRevision(claim, USER_1, () -> components(new Revision(1L, CLIENT_1, COMPONENT_1)));
final Revision updatedRevision = revisionManager.getRevision(originalRevision.getComponentId());
assertNotNull(updatedRevision);
assertEquals(originalRevision.getClientId(), updatedRevision.getClientId());
assertEquals(originalRevision.getComponentId(), updatedRevision.getComponentId());
assertEquals(1L, updatedRevision.getVersion().longValue());
}
@Test
public void testTypicalFlowWithLargeRevisionVersion() throws ExpiredRevisionClaimException {
final RevisionManager revisionManager = new NaiveRevisionManager();
final Revision originalRevision = new Revision(1000L, CLIENT_1, COMPONENT_1);
final RevisionClaim claim = revisionManager.requestClaim(originalRevision, USER_1);
assertNotNull(claim);
revisionManager.updateRevision(claim, USER_1, () -> components(new Revision(1001L, CLIENT_1, COMPONENT_1)));
final Revision updatedRevision = revisionManager.getRevision(originalRevision.getComponentId());
assertNotNull(updatedRevision);
assertEquals(originalRevision.getClientId(), updatedRevision.getClientId());
assertEquals(originalRevision.getComponentId(), updatedRevision.getComponentId());
assertEquals(1001L, updatedRevision.getVersion().longValue());
}
@Test
public void testExpiration() throws InterruptedException {
final RevisionManager revisionManager = new NaiveRevisionManager(10, TimeUnit.MILLISECONDS);
final Revision originalRevision = new Revision(0L, CLIENT_1, COMPONENT_1);
final RevisionClaim claim = revisionManager.requestClaim(originalRevision, USER_1);
assertNotNull(claim);
Thread.sleep(100);
try {
revisionManager.updateRevision(claim, USER_1, () -> components(originalRevision, claim.getRevisions()));
Assert.fail("Expected Revision Claim to have expired but it did not");
} catch (final ExpiredRevisionClaimException erce) {
// expected
}
}
@Test(timeout = 15000)
public void testConflictingClaimsFromDifferentClients() {
final RevisionManager revisionManager = new NaiveRevisionManager(2, TimeUnit.SECONDS);
final Revision originalRevision = new Revision(0L, CLIENT_1, COMPONENT_1);
final RevisionClaim claim = revisionManager.requestClaim(originalRevision, USER_1);
assertNotNull(claim);
final Revision differentClientRevision = new Revision(0L, "client-2", COMPONENT_1);
final long start = System.nanoTime();
final RevisionClaim differentClientClaim = revisionManager.requestClaim(differentClientRevision, USER_1);
final long nanos = System.nanoTime() - start;
// we should block for 2 seconds. But the timing won't necessarily be exact,
// so we ensure that it takes at least 1.5 seconds to provide a little wiggle room.
final long minExpectedNanos = TimeUnit.MILLISECONDS.toNanos(1500);
assertTrue(nanos > minExpectedNanos);
// We should not get a Revision Claim because the revision is already claimed by a different client id
assertNotNull(differentClientClaim);
final Set<Revision> newRevisions = differentClientClaim.getRevisions();
assertEquals(1, newRevisions.size());
assertEquals(differentClientRevision, newRevisions.iterator().next());
}
@Test
public void testGetWithReadLockNoContention() {
final RevisionManager revisionManager = new NaiveRevisionManager(3, TimeUnit.SECONDS);
final Object returnedValue = revisionManager.get(COMPONENT_1, revision -> revision);
assertTrue(returnedValue instanceof Revision);
final Revision revision = (Revision) returnedValue;
assertEquals(0L, revision.getVersion().longValue());
assertNull(revision.getClientId());
assertEquals(COMPONENT_1, revision.getComponentId());
}
@Test(timeout = 10000)
public void testGetWithReadLockAndContentionWithTimeout() {
final RevisionManager revisionManager = new NaiveRevisionManager(2, TimeUnit.SECONDS);
final Revision originalRevision = new Revision(8L, CLIENT_1, COMPONENT_1);
final RevisionClaim claim = revisionManager.requestClaim(originalRevision, USER_1);
assertNotNull(claim);
final long start = System.nanoTime();
final Object returnValue = new Object();
final Object valueReturned = revisionManager.get(COMPONENT_1, revision -> returnValue);
final long nanos = System.nanoTime() - start;
final long minExpectedNanos = TimeUnit.MILLISECONDS.toNanos(1500L);
assertTrue(nanos > minExpectedNanos);
assertEquals(returnValue, valueReturned);
}
@Test(timeout = 10000)
public void testGetWithReadLockAndContentionWithEventualLockResolution() {
final RevisionManager revisionManager = new NaiveRevisionManager(10, TimeUnit.MINUTES);
final Revision originalRevision = new Revision(8L, CLIENT_1, COMPONENT_1);
final RevisionClaim claim = revisionManager.requestClaim(originalRevision, USER_1);
assertNotNull(claim);
final Revision updatedRevision = new Revision(100L, CLIENT_1, COMPONENT_1);
// Create a thread that will hold the lock for 2 seconds and then will return an updated revision
final Thread updateRevisionThread = new Thread(new Runnable() {
@Override
public void run() {
try {
revisionManager.updateRevision(claim, USER_1, () -> {
// Wait 2 seconds and then return
try {
Thread.sleep(2000L);
} catch (Exception e) {
}
return components(updatedRevision);
});
} catch (ExpiredRevisionClaimException e) {
Assert.fail("Revision expired unexpectedly");
}
}
});
updateRevisionThread.start();
final long start = System.nanoTime();
final Object returnValue = new Object();
final Object valueReturned = revisionManager.get(COMPONENT_1, revision -> {
Assert.assertEquals(updatedRevision, revision);
return returnValue;
});
final long nanos = System.nanoTime() - start;
final long minExpectedNanos = TimeUnit.MILLISECONDS.toNanos(1500L);
assertTrue(nanos > minExpectedNanos);
assertEquals(returnValue, valueReturned);
}
@Test(timeout = 10000)
public void testDeleteRevision() {
final RevisionManager revisionManager = new NaiveRevisionManager(2, TimeUnit.MINUTES);
final Revision firstRevision = new Revision(1L, CLIENT_1, COMPONENT_1);
final RevisionClaim firstClaim = revisionManager.requestClaim(firstRevision, USER_1);
assertNotNull(firstClaim);
final Revision secondRevision = new Revision(2L, CLIENT_1, COMPONENT_1);
final FlowModification mod = new FlowModification(secondRevision, "unit test");
revisionManager.updateRevision(firstClaim, USER_1, () -> new StandardRevisionUpdate<Void>(null, mod, null));
final Revision updatedRevision = revisionManager.getRevision(COMPONENT_1);
assertEquals(secondRevision, updatedRevision);
final RevisionClaim secondClaim = revisionManager.requestClaim(updatedRevision, USER_1);
assertNotNull(secondClaim);
final Object obj = new Object();
final Object ret = revisionManager.deleteRevision(secondClaim, USER_1, () -> obj);
assertEquals(obj, ret);
final Revision curRevision = revisionManager.getRevision(COMPONENT_1);
assertNotNull(curRevision);
assertEquals(0L, curRevision.getVersion().longValue());
assertNull(curRevision.getClientId());
assertEquals(COMPONENT_1, curRevision.getComponentId());
}
@Test(timeout = 10000)
public void testSameClientDifferentRevisionsDoNotBlockEachOther() {
final RevisionManager revisionManager = new NaiveRevisionManager(2, TimeUnit.MINUTES);
final Revision firstRevision = new Revision(1L, CLIENT_1, COMPONENT_1);
final RevisionClaim firstClaim = revisionManager.requestClaim(firstRevision, USER_1);
assertNotNull(firstClaim);
final Revision secondRevision = new Revision(1L, CLIENT_1, "component-2");
final RevisionClaim secondClaim = revisionManager.requestClaim(secondRevision, USER_1);
assertNotNull(secondClaim);
}
@Test(timeout = 10000)
public void testSameClientSameRevisionBlocks() throws InterruptedException, ExecutionException {
final RevisionManager revisionManager = new NaiveRevisionManager(2, TimeUnit.MINUTES);
final Revision firstRevision = new Revision(1L, CLIENT_1, COMPONENT_1);
final RevisionClaim firstClaim = revisionManager.requestClaim(firstRevision, USER_1);
assertNotNull(firstClaim);
final Revision secondRevision = new Revision(1L, CLIENT_1, COMPONENT_1);
final Runnable runnable = new Runnable() {
@Override
public void run() {
revisionManager.requestClaim(secondRevision, USER_1);
}
};
final ExecutorService exec = Executors.newFixedThreadPool(1);
final Future<?> future = exec.submit(runnable);
try {
future.get(2, TimeUnit.SECONDS);
Assert.fail("Call to obtain claim on revision did not block when claim was already held");
} catch (TimeoutException e) {
// Expected
}
}
@Test(timeout = 10000)
public void testDifferentClientDifferentRevisionsDoNotBlockEachOther() {
final RevisionManager revisionManager = new NaiveRevisionManager(2, TimeUnit.MINUTES);
final Revision firstRevision = new Revision(1L, CLIENT_1, COMPONENT_1);
final RevisionClaim firstClaim = revisionManager.requestClaim(firstRevision, USER_1);
assertNotNull(firstClaim);
final Revision secondRevision = new Revision(1L, "client-2", "component-2");
final RevisionClaim secondClaim = revisionManager.requestClaim(secondRevision, USER_1);
assertNotNull(secondClaim);
}
@Test
public void testDifferentUserCannotClaimWriteLock() {
final RevisionManager revisionManager = new NaiveRevisionManager(2, TimeUnit.MINUTES);
final Revision firstRevision = new Revision(1L, CLIENT_1, COMPONENT_1);
final RevisionClaim firstClaim = revisionManager.requestClaim(firstRevision, USER_1);
assertNotNull(firstClaim);
final NiFiUser user2 = new StandardNiFiUser("user-2");
try {
revisionManager.updateRevision(firstClaim, user2, () -> null);
Assert.fail("Expected updateRevision to fail with a different user but it succeeded");
} catch (final InvalidRevisionException ire) {
// Expected behavior
}
}
@Test
public void testDifferentUserCannotDeleteRevision() {
final RevisionManager revisionManager = new NaiveRevisionManager(2, TimeUnit.MINUTES);
final Revision firstRevision = new Revision(1L, CLIENT_1, COMPONENT_1);
final RevisionClaim firstClaim = revisionManager.requestClaim(firstRevision, USER_1);
assertNotNull(firstClaim);
final NiFiUser user2 = new StandardNiFiUser("user-2");
try {
revisionManager.deleteRevision(firstClaim, user2, () -> null);
Assert.fail("Expected deleteRevision to fail with a different user but it succeeded");
} catch (final InvalidRevisionException ire) {
// Expected behavior
}
}
@Test
public void testSameUserDifferentClientIdCannotDeleteRevision() {
final RevisionManager revisionManager = new NaiveRevisionManager(2, TimeUnit.MINUTES);
final Revision firstRevision = new Revision(1L, CLIENT_1, COMPONENT_1);
final RevisionClaim firstClaim = revisionManager.requestClaim(firstRevision, USER_1);
assertNotNull(firstClaim);
final Revision differentClientId = new Revision(1L, "client-2", COMPONENT_1);
final RevisionClaim differentClaimIdClaim = new StandardRevisionClaim(differentClientId);
try {
revisionManager.deleteRevision(differentClaimIdClaim, USER_1, () -> null);
Assert.fail("Expected deleteRevision to fail with a different user but it succeeded");
} catch (final InvalidRevisionException ire) {
// Expected behavior
}
}
@Test
public void testSameUserDifferentClientIdCannotClaimWriteLock() {
final RevisionManager revisionManager = new NaiveRevisionManager(2, TimeUnit.MINUTES);
final Revision firstRevision = new Revision(1L, CLIENT_1, COMPONENT_1);
final RevisionClaim firstClaim = revisionManager.requestClaim(firstRevision, USER_1);
assertNotNull(firstClaim);
final Revision differentClientId = new Revision(1L, "client-2", COMPONENT_1);
final RevisionClaim differentClaimIdClaim = new StandardRevisionClaim(differentClientId);
try {
revisionManager.updateRevision(differentClaimIdClaim, USER_1, () -> null);
Assert.fail("Expected deleteRevision to fail with a different user but it succeeded");
} catch (final InvalidRevisionException ire) {
// Expected behavior
}
}
@Test(timeout = 10000)
public void testDifferentOrderedRevisionsDoNotCauseDeadlock() throws ExpiredRevisionClaimException, InterruptedException {
// Because we block before obtaining a claim on a revision if another client has the revision claimed,
// we should not have an issue if Client 1 requests a claim on revisions 'a' and 'b' while Client 2
// requests a claim on revisions 'b' and 'c' and Client 3 requests a claim on revisions 'c' and 'a'.
final RevisionManager revisionManager = new NaiveRevisionManager(2, TimeUnit.MINUTES);
final Revision revision1a = new Revision(1L, "client-1", "a");
final Revision revision1b = new Revision(1L, "client-1", "b");
final Revision revision2b = new Revision(2L, "client-2", "b");
final Revision revision2c = new Revision(2L, "client-2", "c");
final Revision revision3c = new Revision(3L, "client-3", "c");
final Revision revision3a = new Revision(3L, "client-3", "a");
final RevisionClaim claim1 = revisionManager.requestClaim(Arrays.asList(revision1a, revision1b), USER_1);
assertNotNull(claim1);
final AtomicBoolean claim2Obtained = new AtomicBoolean(false);
final AtomicBoolean claim3Obtained = new AtomicBoolean(false);
final AtomicReference<RevisionClaim> claim2Ref = new AtomicReference<>();
final AtomicReference<RevisionClaim> claim3Ref = new AtomicReference<>();
new Thread(new Runnable() {
@Override
public void run() {
final RevisionClaim claim2 = revisionManager.requestClaim(Arrays.asList(revision2b, revision2c), USER_1);
assertNotNull(claim2);
claim2Obtained.set(true);
claim2Ref.set(claim2);
try {
revisionManager.updateRevision(claim2, USER_1, () -> components(new Revision(3L, "client-2", "b"), new Revision(3L, "client-2", "c")));
} catch (ExpiredRevisionClaimException e) {
Assert.fail("Revision unexpected expired");
}
}
}).start();
new Thread(new Runnable() {
@Override
public void run() {
final RevisionClaim claim3 = revisionManager.requestClaim(Arrays.asList(revision3c, revision3a), USER_1);
assertNotNull(claim3);
claim3Obtained.set(true);
claim3Ref.set(claim3);
try {
revisionManager.updateRevision(claim3Ref.get(), USER_1, () -> components(new Revision(2L, "client-3", "c"), new Revision(2L, "client-3", "a")));
} catch (ExpiredRevisionClaimException e) {
Assert.fail("Revision unexpected expired");
}
}
}).start();
Thread.sleep(250L);
assertFalse(claim2Obtained.get());
assertFalse(claim3Obtained.get());
revisionManager.updateRevision(claim1, USER_1, () -> components(new Revision(3L, "client-1", "a"), new Revision(2L, "client-1", "b")));
Thread.sleep(250L);
assertTrue(claim2Obtained.get() && claim3Obtained.get());
assertEquals(2L, revisionManager.getRevision("a").getVersion().longValue());
// The version for 'c' could be either 2 or 3, depending on which request completed first.
final long versionC = revisionManager.getRevision("c").getVersion().longValue();
assertTrue(versionC == 2 || versionC == 3);
assertEquals(3L, revisionManager.getRevision("b").getVersion().longValue());
}
@Test(timeout = 10000)
public void testReleaseClaim() {
final RevisionManager revisionManager = new NaiveRevisionManager(10, TimeUnit.MINUTES);
final Revision firstRevision = new Revision(1L, CLIENT_1, COMPONENT_1);
final RevisionClaim claim = revisionManager.requestClaim(firstRevision, USER_1);
assertNotNull(claim);
final RevisionClaim invalidClaim = new StandardRevisionClaim(new Revision(2L, "client-2", COMPONENT_1));
assertFalse(revisionManager.releaseClaim(invalidClaim, USER_1));
assertTrue(revisionManager.releaseClaim(claim, USER_1));
}
@Test(timeout = 10000)
public void testCancelClaimSameThread() {
final RevisionManager revisionManager = new NaiveRevisionManager(10, TimeUnit.MINUTES);
final Revision firstRevision = new Revision(1L, CLIENT_1, COMPONENT_1);
final RevisionClaim claim = revisionManager.requestClaim(firstRevision, USER_1);
assertNotNull(claim);
assertFalse(revisionManager.cancelClaim("component-2"));
assertTrue(revisionManager.cancelClaim(COMPONENT_1));
}
@Test(timeout = 10000)
public void testCancelClaimDifferentThread() throws InterruptedException {
final RevisionManager revisionManager = new NaiveRevisionManager(10, TimeUnit.MINUTES);
final Revision firstRevision = new Revision(1L, CLIENT_1, COMPONENT_1);
final RevisionClaim claim = revisionManager.requestClaim(firstRevision, USER_1);
assertNotNull(claim);
final Thread t = new Thread(new Runnable() {
@Override
public void run() {
assertFalse(revisionManager.cancelClaim("component-2"));
assertFalse(revisionManager.cancelClaim(COMPONENT_1));
}
});
t.setDaemon(true);
t.start();
Thread.sleep(1000L);
assertTrue(revisionManager.cancelClaim(COMPONENT_1));
}
@Test(timeout = 10000)
public void testUpdateWithSomeWrongRevision() {
final RevisionManager revisionManager = new NaiveRevisionManager(10, TimeUnit.MINUTES);
final Revision component1V1 = new Revision(1L, CLIENT_1, COMPONENT_1);
final Revision component2V1 = new Revision(1L, CLIENT_1, "component-2");
final RevisionClaim claim = revisionManager.requestClaim(Arrays.asList(component1V1, component2V1), USER_1);
assertNotNull(claim);
// Perform update but only update the revision for component-2
final Revision component1V2 = new Revision(2L, "client-2", COMPONENT_1);
revisionManager.updateRevision(claim, USER_1, new UpdateRevisionTask<Void>() {
@Override
public RevisionUpdate<Void> update() {
return new StandardRevisionUpdate<>(null, new FlowModification(component1V2, "unit test"));
}
});
// Obtain a claim with correct revisions
final Revision component2V2 = new Revision(2L, "client-2", "component-2");
revisionManager.requestClaim(Arrays.asList(component1V2, component2V1), USER_1);
// Attempt to update with incorrect revision for second component
final RevisionClaim wrongClaim = new StandardRevisionClaim(component1V2, component2V2);
final Revision component1V3 = new Revision(3L, CLIENT_1, COMPONENT_1);
try {
revisionManager.updateRevision(wrongClaim, USER_1,
() -> new StandardRevisionUpdate<>(null, new FlowModification(component1V3, "unit test"), Collections.emptySet()));
Assert.fail("Expected an Invalid Revision Exception");
} catch (final InvalidRevisionException ire) {
// expected
}
// release claim should fail because we are passing the wrong revision for component 2
assertFalse(revisionManager.releaseClaim(new StandardRevisionClaim(component1V2, component2V2), USER_1));
// release claim should succeed because we are now using the proper revisions
assertTrue(revisionManager.releaseClaim(new StandardRevisionClaim(component1V2, component2V1), USER_1));
// verify that we can update again.
final RevisionClaim thirdClaim = revisionManager.requestClaim(Arrays.asList(component1V2, component2V1), USER_1);
assertNotNull(thirdClaim);
revisionManager.updateRevision(thirdClaim, USER_1, () -> new StandardRevisionUpdate<>(null, new FlowModification(component1V3, "unit test")));
}
@Test(timeout = 5000)
public void testResetWithoutClaimedRevisions() {
final RevisionManager revisionManager = new NaiveRevisionManager(10, TimeUnit.MINUTES);
final Revision component1V1 = new Revision(1L, CLIENT_1, COMPONENT_1);
final Revision component1V2 = new Revision(2L, CLIENT_1, COMPONENT_1);
final RevisionClaim claim = revisionManager.requestClaim(component1V1, USER_1);
revisionManager.updateRevision(claim, USER_1, () -> new StandardRevisionUpdate<>(COMPONENT_1, new FlowModification(component1V2, "unit test")));
final Revision retrievedRevision = revisionManager.getRevision(COMPONENT_1);
assertNotNull(retrievedRevision);
assertEquals(component1V2, retrievedRevision);
revisionManager.reset(Collections.singleton(new Revision(88L, CLIENT_1, COMPONENT_1)));
final Revision retrievedAfterClear = revisionManager.getRevision(COMPONENT_1);
assertNotNull(retrievedAfterClear);
assertEquals(88L, retrievedAfterClear.getVersion().longValue());
final Revision component1V500 = new Revision(500L, "new client id", COMPONENT_1);
try {
revisionManager.requestClaim(component1V500, USER_1);
Assert.fail("Expected InvalidRevisionException was but able to claim revision with wrong version");
} catch (final InvalidRevisionException ire) {
// Expected
}
}
@Test(timeout = 5000)
public void testResetWithClaimedRevisions() {
final RevisionManager revisionManager = new NaiveRevisionManager(2, TimeUnit.SECONDS);
final Revision component1V1 = new Revision(1L, CLIENT_1, COMPONENT_1);
final RevisionClaim claim = revisionManager.requestClaim(component1V1, USER_1);
assertNotNull(claim);
revisionManager.reset(Collections.singleton(new Revision(88L, CLIENT_1, COMPONENT_1)));
final Revision retrievedAfterClear = revisionManager.getRevision(COMPONENT_1);
assertNotNull(retrievedAfterClear);
assertEquals(88L, retrievedAfterClear.getVersion().longValue());
// Should now be able to claim any version that I want
final Revision component1V500 = new Revision(500L, "new client id", COMPONENT_1);
try {
revisionManager.requestClaim(component1V500, USER_1);
Assert.fail("Expected InvalidRevisionException was but able to claim revision with wrong version");
} catch (final InvalidRevisionException ire) {
// Expected
}
}
@Test
public void testGetAllRevisions() {
final RevisionManager revisionManager = new NaiveRevisionManager(2, TimeUnit.SECONDS);
final Revision component1V1 = new Revision(1L, CLIENT_1, COMPONENT_1);
final Revision component1V2 = new Revision(2L, CLIENT_1, COMPONENT_1);
final Revision component2V1 = new Revision(1L, "client-2", "component-2");
final RevisionClaim claim1 = revisionManager.requestClaim(component1V1, USER_1);
assertNotNull(claim1);
revisionManager.updateRevision(claim1, USER_1, () -> new StandardRevisionUpdate<>(COMPONENT_1, new FlowModification(component1V2, "unit test")));
assertNotNull(revisionManager.requestClaim(component2V1, USER_1));
final List<Revision> revisions = revisionManager.getAllRevisions();
assertNotNull(revisions);
assertEquals(2, revisions.size());
boolean component1Seen = false, component2Seen = false;
for (final Revision revision : revisions) {
final String componentId = revision.getComponentId();
if (componentId.equals(COMPONENT_1)) {
assertEquals(component1V2, revision);
component1Seen = true;
} else if (componentId.equals("component-2")) {
assertEquals(component2V1, revision);
component2Seen = true;
} else {
Assert.fail("Got revision for unexpected component: " + revision);
}
}
assertTrue(component1Seen);
assertTrue(component2Seen);
}
@Test(timeout = 5000)
public void testWriteLockReleasedWhenClaimCanceledByRevision() {
final RevisionManager revisionManager = new NaiveRevisionManager(2, TimeUnit.SECONDS);
final Revision component1V1 = new Revision(1L, CLIENT_1, COMPONENT_1);
final RevisionClaim claim = revisionManager.requestClaim(component1V1, USER_1);
assertNotNull(claim);
assertEquals(1, claim.getRevisions().size());
assertEquals(component1V1, claim.getRevisions().iterator().next());
assertTrue(revisionManager.cancelClaim(component1V1));
final RevisionClaim claim2 = revisionManager.requestClaim(component1V1, USER_1);
assertNotNull(claim2);
assertEquals(1, claim2.getRevisions().size());
assertEquals(component1V1, claim2.getRevisions().iterator().next());
}
@Test(timeout = 5000)
public void testWriteLockReleasedWhenClaimCanceledByComponentId() {
final RevisionManager revisionManager = new NaiveRevisionManager(2, TimeUnit.SECONDS);
final Revision component1V1 = new Revision(1L, CLIENT_1, COMPONENT_1);
final RevisionClaim claim = revisionManager.requestClaim(component1V1, USER_1);
assertNotNull(claim);
assertEquals(1, claim.getRevisions().size());
assertEquals(component1V1, claim.getRevisions().iterator().next());
assertTrue(revisionManager.cancelClaim(COMPONENT_1));
final RevisionClaim claim2 = revisionManager.requestClaim(component1V1, USER_1);
assertNotNull(claim2);
assertEquals(1, claim2.getRevisions().size());
assertEquals(component1V1, claim2.getRevisions().iterator().next());
}
@Test(timeout = 5000)
public void testDeleteRevisionUnlocksClaimIfExceptionThrown() {
final RevisionManager revisionManager = new NaiveRevisionManager(2, TimeUnit.MINUTES);
final Revision component1V1 = new Revision(1L, CLIENT_1, COMPONENT_1);
final RevisionClaim claim = revisionManager.requestClaim(component1V1, USER_1);
assertNotNull(claim);
assertEquals(1, claim.getRevisions().size());
assertEquals(component1V1, claim.getRevisions().iterator().next());
final RuntimeException re = new RuntimeException("Intentional Unit Test Exception");
try {
revisionManager.deleteRevision(claim, USER_1, () -> {
throw re;
});
Assert.fail("deleteRevision() method did not propagate Exception thrown");
} catch (final RuntimeException e) {
assertTrue(re == e);
}
// Ensure that we can obtain a read lock
revisionManager.get(COMPONENT_1, rev -> rev);
final RevisionClaim claim2 = revisionManager.requestClaim(component1V1, USER_1);
assertNotNull(claim2);
assertEquals(1, claim2.getRevisions().size());
assertEquals(component1V1, claim2.getRevisions().iterator().next());
}
@Test(timeout = 5000)
public void testUpdateRevisionUnlocksClaimIfExceptionThrown() {
final RevisionManager revisionManager = new NaiveRevisionManager(2, TimeUnit.MINUTES);
final Revision component1V1 = new Revision(1L, CLIENT_1, COMPONENT_1);
final RevisionClaim claim = revisionManager.requestClaim(component1V1, USER_1);
assertNotNull(claim);
assertEquals(1, claim.getRevisions().size());
assertEquals(component1V1, claim.getRevisions().iterator().next());
final RuntimeException re = new RuntimeException("Intentional Unit Test Exception");
try {
revisionManager.updateRevision(claim, USER_1, () -> {
throw re;
});
Assert.fail("updateRevision() method did not propagate Exception thrown");
} catch (final RuntimeException e) {
assertTrue(re == e);
}
// Ensure that we can obtain a read lock
revisionManager.get(COMPONENT_1, rev -> rev);
final RevisionClaim claim2 = revisionManager.requestClaim(component1V1, USER_1);
assertNotNull(claim2);
assertEquals(1, claim2.getRevisions().size());
assertEquals(component1V1, claim2.getRevisions().iterator().next());
}
}