From 87e062ff557d18aa3f1f7e2906357c81236f0328 Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Fri, 7 Jul 2017 16:07:15 -0400 Subject: [PATCH] NIFI-4167: StandardResourceClaimManager should not synchronize on a ResourceClaim in order to determine the claim count. This closes #1996 --- .../TestStandardResourceClaimManager.java | 33 +++++++++++++++++++ .../claim/StandardResourceClaimManager.java | 13 +++++--- 2 files changed, 42 insertions(+), 4 deletions(-) diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/claim/TestStandardResourceClaimManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/claim/TestStandardResourceClaimManager.java index 867810ee28..f4fe120e8d 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/claim/TestStandardResourceClaimManager.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/claim/TestStandardResourceClaimManager.java @@ -18,8 +18,13 @@ package org.apache.nifi.controller.repository.claim; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import java.util.ArrayList; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicBoolean; import org.junit.Assert; @@ -28,6 +33,34 @@ import org.junit.Test; public class TestStandardResourceClaimManager { + + @Test(timeout = 10000) + public void testGetClaimantCountWhileMarkingDestructable() throws InterruptedException, ExecutionException { + final StandardResourceClaimManager manager = new StandardResourceClaimManager(); + + for (int i = 0; i < 50000; i++) { + final ResourceClaim rc = manager.newResourceClaim("container", "section", String.valueOf(i), false, false); + manager.markDestructable(rc); + } + + final Object completedObject = new Object(); + final CompletableFuture future = new CompletableFuture<>(); + final ResourceClaim lastClaim = manager.newResourceClaim("container", "section", "lastOne", false, false); + final Thread backgroundThread = new Thread(() -> { + manager.markDestructable(lastClaim); + future.complete(completedObject); + }); + + backgroundThread.start(); + + Thread.sleep(10); + assertEquals(0, manager.getClaimantCount(lastClaim)); + assertNull(future.getNow(null)); + manager.drainDestructableClaims(new ArrayList<>(), 1); + assertTrue(completedObject == future.get()); + } + + @Test @Ignore("Unit test was created to repeat a concurrency bug in StandardResourceClaimManager. " + "However, now that the concurrency bug has been fixed, the test will deadlock. Leaving here for now in case it's valuable before the commit is pushed") diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-repository-models/src/main/java/org/apache/nifi/controller/repository/claim/StandardResourceClaimManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-repository-models/src/main/java/org/apache/nifi/controller/repository/claim/StandardResourceClaimManager.java index e4f060e64b..92b65ec807 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-repository-models/src/main/java/org/apache/nifi/controller/repository/claim/StandardResourceClaimManager.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-repository-models/src/main/java/org/apache/nifi/controller/repository/claim/StandardResourceClaimManager.java @@ -70,10 +70,15 @@ public class StandardResourceClaimManager implements ResourceClaimManager { return 0; } - synchronized (claim) { - final ClaimCount counter = claimantCounts.get(claim); - return counter == null ? 0 : counter.getCount().get(); - } + // No need to synchronize on the Resource Claim here, since this is simply obtaining a value. + // We synchronize elsewhere because we want to atomically perform multiple operations, such as + // getting the claimant count and then updating a queue. However, the operation of obtaining + // the ClaimCount and getting its count value has no side effect and therefore can be performed + // without synchronization (since the claimantCounts map and the ClaimCount are also both thread-safe + // and there is no need for the two actions of obtaining the ClaimCount and getting its Count value + // to be performed atomically). + final ClaimCount counter = claimantCounts.get(claim); + return counter == null ? 0 : counter.getCount().get(); } @Override