From 9b0a2cb321b74862f3fd9ab3b2aa97e2a9aa9675 Mon Sep 17 00:00:00 2001 From: Jian He Date: Fri, 11 Nov 2016 15:00:16 -0800 Subject: [PATCH] YARN-5825. ProportionalPreemptionalPolicy should use readLock over LeafQueue instead of synchronized block. Contributed by Sunil G (cherry picked from commit fad9609d13e76e9e3a4e01c96f698bb60b03807e) --- .../monitor/capacity/FifoCandidatesSelector.java | 5 ++++- .../capacity/IntraQueueCandidatesSelector.java | 5 ++++- .../ProportionalCapacityPreemptionPolicy.java | 14 +++++++++++--- .../scheduler/capacity/AbstractCSQueue.java | 6 +++++- .../scheduler/capacity/CSQueue.java | 7 +++++++ ...ionalCapacityPreemptionPolicyMockFramework.java | 3 +++ .../TestProportionalCapacityPreemptionPolicy.java | 5 +++++ 7 files changed, 39 insertions(+), 6 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoCandidatesSelector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoCandidatesSelector.java index 39336a4901c..f4d7e92a110 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoCandidatesSelector.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoCandidatesSelector.java @@ -89,7 +89,8 @@ public class FifoCandidatesSelector .getResToObtainByPartitionForLeafQueue(preemptionContext, queueName, clusterResource); - synchronized (leafQueue) { + try { + leafQueue.getReadLock().lock(); // go through all ignore-partition-exclusivity containers first to make // sure such containers will be preemptionCandidates first Map> ignorePartitionExclusivityContainers = @@ -147,6 +148,8 @@ public class FifoCandidatesSelector preemptAMContainers(clusterResource, selectedCandidates, skippedAMContainerlist, resToObtainByPartition, skippedAMSize, maxAMCapacityForThisQueue, totalPreemptionAllowed); + } finally { + leafQueue.getReadLock().unlock(); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueueCandidatesSelector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueueCandidatesSelector.java index 039b53e667e..4f2b272cf7a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueueCandidatesSelector.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueueCandidatesSelector.java @@ -118,7 +118,8 @@ public class IntraQueueCandidatesSelector extends PreemptionCandidatesSelector { // 6. Based on the selected resource demand per partition, select // containers with known policy from inter-queue preemption. - synchronized (leafQueue) { + try { + leafQueue.getReadLock().lock(); Iterator desc = leafQueue.getOrderingPolicy() .getPreemptionIterator(); while (desc.hasNext()) { @@ -127,6 +128,8 @@ public class IntraQueueCandidatesSelector extends PreemptionCandidatesSelector { totalPreemptedResourceAllowed, resToObtainByPartition, leafQueue, app); } + } finally { + leafQueue.getReadLock().unlock(); } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java index 53470740b41..324e8453a61 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java @@ -56,6 +56,7 @@ import java.util.LinkedHashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; /** * This class implement a {@link SchedulingEditPolicy} that is designed to be @@ -430,15 +431,19 @@ public class ProportionalCapacityPreemptionPolicy private TempQueuePerPartition cloneQueues(CSQueue curQueue, Resource partitionResource, String partitionToLookAt) { TempQueuePerPartition ret; - synchronized (curQueue) { + ReadLock readLock = curQueue.getReadLock(); + try { + // Acquire a read lock from Parent/LeafQueue. + readLock.lock(); + String queueName = curQueue.getQueueName(); QueueCapacities qc = curQueue.getQueueCapacities(); float absCap = qc.getAbsoluteCapacity(partitionToLookAt); float absMaxCap = qc.getAbsoluteMaximumCapacity(partitionToLookAt); boolean preemptionDisabled = curQueue.getPreemptionDisabled(); - Resource current = Resources.clone( - curQueue.getQueueResourceUsage().getUsed(partitionToLookAt)); + Resource current = Resources + .clone(curQueue.getQueueResourceUsage().getUsed(partitionToLookAt)); Resource killable = Resources.none(); Resource reserved = Resources.clone( @@ -472,7 +477,10 @@ public class ProportionalCapacityPreemptionPolicy ret.addChild(subq); } } + } finally { + readLock.unlock(); } + addTempQueuePartition(ret); return ret; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java index 7e18b293734..3daabafd53f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java @@ -432,7 +432,6 @@ public abstract class AbstractCSQueue implements CSQueue { } finally { readLock.unlock(); } - } @Private @@ -450,6 +449,11 @@ public abstract class AbstractCSQueue implements CSQueue { return queueUsage; } + @Override + public ReentrantReadWriteLock.ReadLock getReadLock() { + return readLock; + } + /** * The specified queue is preemptable if system-wide preemption is turned on * unless any queue in the qPath hierarchy has explicitly turned diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java index e5cbd04682e..baf60e4f309 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java @@ -22,6 +22,7 @@ import java.io.IOException; import java.util.Collection; import java.util.List; import java.util.Set; +import java.util.concurrent.locks.ReentrantReadWriteLock; import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.classification.InterfaceAudience.Private; @@ -355,4 +356,10 @@ extends org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue { void apply(Resource cluster, ResourceCommitRequest request); + + /** + * Get readLock associated with the Queue. + * @return readLock of corresponding queue. + */ + public ReentrantReadWriteLock.ReadLock getReadLock(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicyMockFramework.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicyMockFramework.java index 5b8425ba199..0281c198fc8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicyMockFramework.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicyMockFramework.java @@ -70,6 +70,7 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.TreeSet; +import java.util.concurrent.locks.ReentrantReadWriteLock; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyString; @@ -525,6 +526,8 @@ public class ProportionalCapacityPreemptionPolicyMockFramework { queue = leafQueue; } + ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); + when(queue.getReadLock()).thenReturn(lock.readLock()); setupQueue(queue, q, queueExprArray, idx); if (queue.getQueueName().equals(ROOT)) { rootQueue = (ParentQueue) queue; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java index 0f5d526e216..881004c64c6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java @@ -74,6 +74,7 @@ import java.util.NavigableSet; import java.util.Random; import java.util.StringTokenizer; import java.util.TreeSet; +import java.util.concurrent.locks.ReentrantReadWriteLock; import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType.MARK_CONTAINER_FOR_KILLABLE; import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType.MARK_CONTAINER_FOR_PREEMPTION; @@ -1203,6 +1204,8 @@ public class TestProportionalCapacityPreemptionPolicy { ParentQueue pq = mock(ParentQueue.class); List cqs = new ArrayList(); when(pq.getChildQueues()).thenReturn(cqs); + ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); + when(pq.getReadLock()).thenReturn(lock.readLock()); for (int i = 0; i < subqueues; ++i) { pqs.add(pq); } @@ -1264,6 +1267,8 @@ public class TestProportionalCapacityPreemptionPolicy { if(setAMResourcePercent != 0.0f){ when(lq.getMaxAMResourcePerQueuePercent()).thenReturn(setAMResourcePercent); } + ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); + when(lq.getReadLock()).thenReturn(lock.readLock()); p.getChildQueues().add(lq); return lq; }