YARN-4671. There is no need to acquire CS lock when completing a container. Contributed by Meng Ding

This commit is contained in:
Jian He 2016-03-01 13:14:12 -08:00
parent 2c8496ebf3
commit 5c465df904
3 changed files with 35 additions and 11 deletions

View File

@ -1480,6 +1480,9 @@ Release 2.8.0 - UNRELEASED
YARN-4748. ApplicationHistoryManagerOnTimelineStore should not
swallow exceptions on generateApplicationReport. (Li Lu via jianhe)
YARN-4671. There is no need to acquire CS lock when completing a container.
(Meng Ding via jianhe)
Release 2.7.3 - UNRELEASED
INCOMPATIBLE CHANGES

View File

@ -228,7 +228,7 @@ public class CapacityScheduler extends
private AsyncScheduleThread asyncSchedulerThread;
private RMNodeLabelsManager labelManager;
private SchedulerHealth schedulerHealth = new SchedulerHealth();
long lastNodeUpdateTime;
volatile long lastNodeUpdateTime;
/**
* EXPERT
@ -928,7 +928,6 @@ public class CapacityScheduler extends
}
@Override
// Note: when AM asks to release container, we will acquire scheduler lock
@Lock(Lock.NoLock.class)
public Allocation allocate(ApplicationAttemptId applicationAttemptId,
List<ResourceRequest> ask, List<ContainerId> release,
@ -1546,9 +1545,8 @@ public class CapacityScheduler extends
}
}
@Lock(CapacityScheduler.class)
@Override
protected synchronized void completedContainerInternal(
protected void completedContainerInternal(
RMContainer rmContainer, ContainerStatus containerStatus,
RMContainerEventType event) {
@ -1957,7 +1955,7 @@ public class CapacityScheduler extends
return this.schedulerHealth;
}
private synchronized void setLastNodeUpdateTime(long time) {
private void setLastNodeUpdateTime(long time) {
this.lastNodeUpdateTime = time;
}

View File

@ -1078,9 +1078,33 @@ public class TestCapacityScheduler {
RegisterApplicationMasterRequest.newInstance("localhost", 12345, "");
client.registerApplicationMaster(request);
// Allocate a container
List<ResourceRequest> asks = Collections.singletonList(
ResourceRequest.newInstance(
Priority.newInstance(1), "*", Resources.createResource(2 * GB), 1));
AllocateRequest allocateRequest =
AllocateRequest.newInstance(0, 0.0f, asks, null, null);
client.allocate(allocateRequest);
// Make sure the container is allocated in RM
nm1.nodeHeartbeat(true);
ContainerId containerId2 =
ContainerId.newContainerId(applicationAttemptId, 2);
Assert.assertTrue(rm.waitForState(nm1, containerId2,
RMContainerState.ALLOCATED, 10 * 1000));
// Acquire the container
allocateRequest = AllocateRequest.newInstance(1, 0.0f, null, null, null);
client.allocate(allocateRequest);
// Launch the container
final CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
RMContainer rmContainer = cs.getRMContainer(containerId2);
rmContainer.handle(
new RMContainerEvent(containerId2, RMContainerEventType.LAUNCHED));
// grab the scheduler lock from another thread
// and verify an allocate call in this thread doesn't block on it
final CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
final CyclicBarrier barrier = new CyclicBarrier(2);
Thread otherThread = new Thread(new Runnable() {
@Override
@ -1089,9 +1113,7 @@ public class TestCapacityScheduler {
try {
barrier.await();
barrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
}
@ -1099,8 +1121,9 @@ public class TestCapacityScheduler {
});
otherThread.start();
barrier.await();
AllocateRequest allocateRequest =
AllocateRequest.newInstance(0, 0.0f, null, null, null);
List<ContainerId> release = Collections.singletonList(containerId2);
allocateRequest =
AllocateRequest.newInstance(2, 0.0f, null, release, null);
client.allocate(allocateRequest);
barrier.await();
otherThread.join();