YARN-5825. ProportionalPreemptionalPolicy should use readLock over LeafQueue instead of synchronized block. Contributed by Sunil G

This commit is contained in:
Jian He 2016-11-11 15:00:16 -08:00
parent 3d26717777
commit fad9609d13
7 changed files with 39 additions and 6 deletions

View File

@ -89,7 +89,8 @@ public class FifoCandidatesSelector
.getResToObtainByPartitionForLeafQueue(preemptionContext, .getResToObtainByPartitionForLeafQueue(preemptionContext,
queueName, clusterResource); queueName, clusterResource);
synchronized (leafQueue) { try {
leafQueue.getReadLock().lock();
// go through all ignore-partition-exclusivity containers first to make // go through all ignore-partition-exclusivity containers first to make
// sure such containers will be preemptionCandidates first // sure such containers will be preemptionCandidates first
Map<String, TreeSet<RMContainer>> ignorePartitionExclusivityContainers = Map<String, TreeSet<RMContainer>> ignorePartitionExclusivityContainers =
@ -147,6 +148,8 @@ public class FifoCandidatesSelector
preemptAMContainers(clusterResource, selectedCandidates, skippedAMContainerlist, preemptAMContainers(clusterResource, selectedCandidates, skippedAMContainerlist,
resToObtainByPartition, skippedAMSize, maxAMCapacityForThisQueue, resToObtainByPartition, skippedAMSize, maxAMCapacityForThisQueue,
totalPreemptionAllowed); totalPreemptionAllowed);
} finally {
leafQueue.getReadLock().unlock();
} }
} }

View File

@ -118,7 +118,8 @@ public class IntraQueueCandidatesSelector extends PreemptionCandidatesSelector {
// 6. Based on the selected resource demand per partition, select // 6. Based on the selected resource demand per partition, select
// containers with known policy from inter-queue preemption. // containers with known policy from inter-queue preemption.
synchronized (leafQueue) { try {
leafQueue.getReadLock().lock();
Iterator<FiCaSchedulerApp> desc = leafQueue.getOrderingPolicy() Iterator<FiCaSchedulerApp> desc = leafQueue.getOrderingPolicy()
.getPreemptionIterator(); .getPreemptionIterator();
while (desc.hasNext()) { while (desc.hasNext()) {
@ -127,6 +128,8 @@ public class IntraQueueCandidatesSelector extends PreemptionCandidatesSelector {
totalPreemptedResourceAllowed, resToObtainByPartition, totalPreemptedResourceAllowed, resToObtainByPartition,
leafQueue, app); leafQueue, app);
} }
} finally {
leafQueue.getReadLock().unlock();
} }
} }
} }

View File

@ -56,6 +56,7 @@ import java.util.LinkedHashSet;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
/** /**
* This class implement a {@link SchedulingEditPolicy} that is designed to be * This class implement a {@link SchedulingEditPolicy} that is designed to be
@ -430,15 +431,19 @@ public class ProportionalCapacityPreemptionPolicy
private TempQueuePerPartition cloneQueues(CSQueue curQueue, private TempQueuePerPartition cloneQueues(CSQueue curQueue,
Resource partitionResource, String partitionToLookAt) { Resource partitionResource, String partitionToLookAt) {
TempQueuePerPartition ret; TempQueuePerPartition ret;
synchronized (curQueue) { ReadLock readLock = curQueue.getReadLock();
try {
// Acquire a read lock from Parent/LeafQueue.
readLock.lock();
String queueName = curQueue.getQueueName(); String queueName = curQueue.getQueueName();
QueueCapacities qc = curQueue.getQueueCapacities(); QueueCapacities qc = curQueue.getQueueCapacities();
float absCap = qc.getAbsoluteCapacity(partitionToLookAt); float absCap = qc.getAbsoluteCapacity(partitionToLookAt);
float absMaxCap = qc.getAbsoluteMaximumCapacity(partitionToLookAt); float absMaxCap = qc.getAbsoluteMaximumCapacity(partitionToLookAt);
boolean preemptionDisabled = curQueue.getPreemptionDisabled(); boolean preemptionDisabled = curQueue.getPreemptionDisabled();
Resource current = Resources.clone( Resource current = Resources
curQueue.getQueueResourceUsage().getUsed(partitionToLookAt)); .clone(curQueue.getQueueResourceUsage().getUsed(partitionToLookAt));
Resource killable = Resources.none(); Resource killable = Resources.none();
Resource reserved = Resources.clone( Resource reserved = Resources.clone(
@ -472,7 +477,10 @@ public class ProportionalCapacityPreemptionPolicy
ret.addChild(subq); ret.addChild(subq);
} }
} }
} finally {
readLock.unlock();
} }
addTempQueuePartition(ret); addTempQueuePartition(ret);
return ret; return ret;
} }

View File

@ -432,7 +432,6 @@ public abstract class AbstractCSQueue implements CSQueue {
} finally { } finally {
readLock.unlock(); readLock.unlock();
} }
} }
@Private @Private
@ -450,6 +449,11 @@ public abstract class AbstractCSQueue implements CSQueue {
return queueUsage; return queueUsage;
} }
@Override
public ReentrantReadWriteLock.ReadLock getReadLock() {
return readLock;
}
/** /**
* The specified queue is preemptable if system-wide preemption is turned on * The specified queue is preemptable if system-wide preemption is turned on
* unless any queue in the <em>qPath</em> hierarchy has explicitly turned * unless any queue in the <em>qPath</em> hierarchy has explicitly turned

View File

@ -22,6 +22,7 @@ import java.io.IOException;
import java.util.Collection; import java.util.Collection;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceAudience.Private;
@ -355,4 +356,10 @@ extends org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue {
void apply(Resource cluster, void apply(Resource cluster,
ResourceCommitRequest<FiCaSchedulerApp, FiCaSchedulerNode> request); ResourceCommitRequest<FiCaSchedulerApp, FiCaSchedulerNode> request);
/**
* Get readLock associated with the Queue.
* @return readLock of corresponding queue.
*/
public ReentrantReadWriteLock.ReadLock getReadLock();
} }

View File

@ -70,6 +70,7 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.TreeSet; import java.util.TreeSet;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import static org.mockito.Matchers.any; import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyString; import static org.mockito.Matchers.anyString;
@ -525,6 +526,8 @@ public class ProportionalCapacityPreemptionPolicyMockFramework {
queue = leafQueue; queue = leafQueue;
} }
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
when(queue.getReadLock()).thenReturn(lock.readLock());
setupQueue(queue, q, queueExprArray, idx); setupQueue(queue, q, queueExprArray, idx);
if (queue.getQueueName().equals(ROOT)) { if (queue.getQueueName().equals(ROOT)) {
rootQueue = (ParentQueue) queue; rootQueue = (ParentQueue) queue;

View File

@ -74,6 +74,7 @@ import java.util.NavigableSet;
import java.util.Random; import java.util.Random;
import java.util.StringTokenizer; import java.util.StringTokenizer;
import java.util.TreeSet; import java.util.TreeSet;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType.MARK_CONTAINER_FOR_KILLABLE; import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType.MARK_CONTAINER_FOR_KILLABLE;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType.MARK_CONTAINER_FOR_PREEMPTION; import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType.MARK_CONTAINER_FOR_PREEMPTION;
@ -1203,6 +1204,8 @@ public class TestProportionalCapacityPreemptionPolicy {
ParentQueue pq = mock(ParentQueue.class); ParentQueue pq = mock(ParentQueue.class);
List<CSQueue> cqs = new ArrayList<CSQueue>(); List<CSQueue> cqs = new ArrayList<CSQueue>();
when(pq.getChildQueues()).thenReturn(cqs); when(pq.getChildQueues()).thenReturn(cqs);
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
when(pq.getReadLock()).thenReturn(lock.readLock());
for (int i = 0; i < subqueues; ++i) { for (int i = 0; i < subqueues; ++i) {
pqs.add(pq); pqs.add(pq);
} }
@ -1264,6 +1267,8 @@ public class TestProportionalCapacityPreemptionPolicy {
if(setAMResourcePercent != 0.0f){ if(setAMResourcePercent != 0.0f){
when(lq.getMaxAMResourcePerQueuePercent()).thenReturn(setAMResourcePercent); when(lq.getMaxAMResourcePerQueuePercent()).thenReturn(setAMResourcePercent);
} }
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
when(lq.getReadLock()).thenReturn(lock.readLock());
p.getChildQueues().add(lq); p.getChildQueues().add(lq);
return lq; return lq;
} }