YARN-5825. ProportionalPreemptionalPolicy should use readLock over LeafQueue instead of synchronized block. Contributed by Sunil G

(cherry picked from commit fad9609d13)
This commit is contained in:
Jian He 2016-11-11 15:00:16 -08:00
parent 5613b0818c
commit 9b0a2cb321
7 changed files with 39 additions and 6 deletions

View File

@ -89,7 +89,8 @@ public class FifoCandidatesSelector
.getResToObtainByPartitionForLeafQueue(preemptionContext,
queueName, clusterResource);
synchronized (leafQueue) {
try {
leafQueue.getReadLock().lock();
// go through all ignore-partition-exclusivity containers first to make
// sure such containers will be preemptionCandidates first
Map<String, TreeSet<RMContainer>> ignorePartitionExclusivityContainers =
@ -147,6 +148,8 @@ public class FifoCandidatesSelector
preemptAMContainers(clusterResource, selectedCandidates, skippedAMContainerlist,
resToObtainByPartition, skippedAMSize, maxAMCapacityForThisQueue,
totalPreemptionAllowed);
} finally {
leafQueue.getReadLock().unlock();
}
}

View File

@ -118,7 +118,8 @@ public class IntraQueueCandidatesSelector extends PreemptionCandidatesSelector {
// 6. Based on the selected resource demand per partition, select
// containers with known policy from inter-queue preemption.
synchronized (leafQueue) {
try {
leafQueue.getReadLock().lock();
Iterator<FiCaSchedulerApp> desc = leafQueue.getOrderingPolicy()
.getPreemptionIterator();
while (desc.hasNext()) {
@ -127,6 +128,8 @@ public class IntraQueueCandidatesSelector extends PreemptionCandidatesSelector {
totalPreemptedResourceAllowed, resToObtainByPartition,
leafQueue, app);
}
} finally {
leafQueue.getReadLock().unlock();
}
}
}

View File

@ -56,6 +56,7 @@ import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
/**
* This class implement a {@link SchedulingEditPolicy} that is designed to be
@ -430,15 +431,19 @@ public class ProportionalCapacityPreemptionPolicy
private TempQueuePerPartition cloneQueues(CSQueue curQueue,
Resource partitionResource, String partitionToLookAt) {
TempQueuePerPartition ret;
synchronized (curQueue) {
ReadLock readLock = curQueue.getReadLock();
try {
// Acquire a read lock from Parent/LeafQueue.
readLock.lock();
String queueName = curQueue.getQueueName();
QueueCapacities qc = curQueue.getQueueCapacities();
float absCap = qc.getAbsoluteCapacity(partitionToLookAt);
float absMaxCap = qc.getAbsoluteMaximumCapacity(partitionToLookAt);
boolean preemptionDisabled = curQueue.getPreemptionDisabled();
Resource current = Resources.clone(
curQueue.getQueueResourceUsage().getUsed(partitionToLookAt));
Resource current = Resources
.clone(curQueue.getQueueResourceUsage().getUsed(partitionToLookAt));
Resource killable = Resources.none();
Resource reserved = Resources.clone(
@ -472,7 +477,10 @@ public class ProportionalCapacityPreemptionPolicy
ret.addChild(subq);
}
}
} finally {
readLock.unlock();
}
addTempQueuePartition(ret);
return ret;
}

View File

@ -432,7 +432,6 @@ public abstract class AbstractCSQueue implements CSQueue {
} finally {
readLock.unlock();
}
}
@Private
@ -450,6 +449,11 @@ public abstract class AbstractCSQueue implements CSQueue {
return queueUsage;
}
@Override
public ReentrantReadWriteLock.ReadLock getReadLock() {
return readLock;
}
/**
* The specified queue is preemptable if system-wide preemption is turned on
* unless any queue in the <em>qPath</em> hierarchy has explicitly turned

View File

@ -22,6 +22,7 @@ import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.Set;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.classification.InterfaceAudience.Private;
@ -355,4 +356,10 @@ extends org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue {
void apply(Resource cluster,
ResourceCommitRequest<FiCaSchedulerApp, FiCaSchedulerNode> request);
/**
* Get readLock associated with the Queue.
* @return readLock of corresponding queue.
*/
public ReentrantReadWriteLock.ReadLock getReadLock();
}

View File

@ -70,6 +70,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyString;
@ -525,6 +526,8 @@ public class ProportionalCapacityPreemptionPolicyMockFramework {
queue = leafQueue;
}
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
when(queue.getReadLock()).thenReturn(lock.readLock());
setupQueue(queue, q, queueExprArray, idx);
if (queue.getQueueName().equals(ROOT)) {
rootQueue = (ParentQueue) queue;

View File

@ -74,6 +74,7 @@ import java.util.NavigableSet;
import java.util.Random;
import java.util.StringTokenizer;
import java.util.TreeSet;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType.MARK_CONTAINER_FOR_KILLABLE;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType.MARK_CONTAINER_FOR_PREEMPTION;
@ -1203,6 +1204,8 @@ public class TestProportionalCapacityPreemptionPolicy {
ParentQueue pq = mock(ParentQueue.class);
List<CSQueue> cqs = new ArrayList<CSQueue>();
when(pq.getChildQueues()).thenReturn(cqs);
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
when(pq.getReadLock()).thenReturn(lock.readLock());
for (int i = 0; i < subqueues; ++i) {
pqs.add(pq);
}
@ -1264,6 +1267,8 @@ public class TestProportionalCapacityPreemptionPolicy {
if(setAMResourcePercent != 0.0f){
when(lq.getMaxAMResourcePerQueuePercent()).thenReturn(setAMResourcePercent);
}
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
when(lq.getReadLock()).thenReturn(lock.readLock());
p.getChildQueues().add(lq);
return lq;
}