mirror of https://github.com/apache/nifi.git
NIFI-4167: StandardResourceClaimManager should not synchronize on a ResourceClaim in order to determine the claim count. This closes #1996
This commit is contained in:
parent
2dc45a4dd7
commit
87e062ff55
|
@ -18,8 +18,13 @@
|
||||||
package org.apache.nifi.controller.repository.claim;
|
package org.apache.nifi.controller.repository.claim;
|
||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertNull;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.concurrent.CompletableFuture;
|
||||||
import java.util.concurrent.CountDownLatch;
|
import java.util.concurrent.CountDownLatch;
|
||||||
|
import java.util.concurrent.ExecutionException;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
|
@ -28,6 +33,34 @@ import org.junit.Test;
|
||||||
|
|
||||||
public class TestStandardResourceClaimManager {
|
public class TestStandardResourceClaimManager {
|
||||||
|
|
||||||
|
|
||||||
|
@Test(timeout = 10000)
|
||||||
|
public void testGetClaimantCountWhileMarkingDestructable() throws InterruptedException, ExecutionException {
|
||||||
|
final StandardResourceClaimManager manager = new StandardResourceClaimManager();
|
||||||
|
|
||||||
|
for (int i = 0; i < 50000; i++) {
|
||||||
|
final ResourceClaim rc = manager.newResourceClaim("container", "section", String.valueOf(i), false, false);
|
||||||
|
manager.markDestructable(rc);
|
||||||
|
}
|
||||||
|
|
||||||
|
final Object completedObject = new Object();
|
||||||
|
final CompletableFuture<Object> future = new CompletableFuture<>();
|
||||||
|
final ResourceClaim lastClaim = manager.newResourceClaim("container", "section", "lastOne", false, false);
|
||||||
|
final Thread backgroundThread = new Thread(() -> {
|
||||||
|
manager.markDestructable(lastClaim);
|
||||||
|
future.complete(completedObject);
|
||||||
|
});
|
||||||
|
|
||||||
|
backgroundThread.start();
|
||||||
|
|
||||||
|
Thread.sleep(10);
|
||||||
|
assertEquals(0, manager.getClaimantCount(lastClaim));
|
||||||
|
assertNull(future.getNow(null));
|
||||||
|
manager.drainDestructableClaims(new ArrayList<>(), 1);
|
||||||
|
assertTrue(completedObject == future.get());
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@Ignore("Unit test was created to repeat a concurrency bug in StandardResourceClaimManager. "
|
@Ignore("Unit test was created to repeat a concurrency bug in StandardResourceClaimManager. "
|
||||||
+ "However, now that the concurrency bug has been fixed, the test will deadlock. Leaving here for now in case it's valuable before the commit is pushed")
|
+ "However, now that the concurrency bug has been fixed, the test will deadlock. Leaving here for now in case it's valuable before the commit is pushed")
|
||||||
|
|
|
@ -70,11 +70,16 @@ public class StandardResourceClaimManager implements ResourceClaimManager {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
synchronized (claim) {
|
// No need to synchronize on the Resource Claim here, since this is simply obtaining a value.
|
||||||
|
// We synchronize elsewhere because we want to atomically perform multiple operations, such as
|
||||||
|
// getting the claimant count and then updating a queue. However, the operation of obtaining
|
||||||
|
// the ClaimCount and getting its count value has no side effect and therefore can be performed
|
||||||
|
// without synchronization (since the claimantCounts map and the ClaimCount are also both thread-safe
|
||||||
|
// and there is no need for the two actions of obtaining the ClaimCount and getting its Count value
|
||||||
|
// to be performed atomically).
|
||||||
final ClaimCount counter = claimantCounts.get(claim);
|
final ClaimCount counter = claimantCounts.get(claim);
|
||||||
return counter == null ? 0 : counter.getCount().get();
|
return counter == null ? 0 : counter.getCount().get();
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public int decrementClaimantCount(final ResourceClaim claim) {
|
public int decrementClaimantCount(final ResourceClaim claim) {
|
||||||
|
|
Loading…
Reference in New Issue