YARN-4671. There is no need to acquire CS lock when completing a container. Contributed by Meng Ding
This commit is contained in:
parent
cc28118eed
commit
c50f79d838
|
@ -1231,6 +1231,9 @@ Release 2.8.0 - UNRELEASED
|
|||
YARN-4748. ApplicationHistoryManagerOnTimelineStore should not
|
||||
swallow exceptions on generateApplicationReport. (Li Lu via jianhe)
|
||||
|
||||
YARN-4671. There is no need to acquire CS lock when completing a container.
|
||||
(Meng Ding via jianhe)
|
||||
|
||||
Release 2.7.3 - UNRELEASED
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
|
|
@ -227,7 +227,7 @@ public class CapacityScheduler extends
|
|||
private AsyncScheduleThread asyncSchedulerThread;
|
||||
private RMNodeLabelsManager labelManager;
|
||||
private SchedulerHealth schedulerHealth = new SchedulerHealth();
|
||||
long lastNodeUpdateTime;
|
||||
volatile long lastNodeUpdateTime;
|
||||
|
||||
/**
|
||||
* EXPERT
|
||||
|
@ -925,7 +925,6 @@ public class CapacityScheduler extends
|
|||
}
|
||||
|
||||
@Override
|
||||
// Note: when AM asks to release container, we will acquire scheduler lock
|
||||
@Lock(Lock.NoLock.class)
|
||||
public Allocation allocate(ApplicationAttemptId applicationAttemptId,
|
||||
List<ResourceRequest> ask, List<ContainerId> release,
|
||||
|
@ -1543,9 +1542,8 @@ public class CapacityScheduler extends
|
|||
}
|
||||
}
|
||||
|
||||
@Lock(CapacityScheduler.class)
|
||||
@Override
|
||||
protected synchronized void completedContainerInternal(
|
||||
protected void completedContainerInternal(
|
||||
RMContainer rmContainer, ContainerStatus containerStatus,
|
||||
RMContainerEventType event) {
|
||||
|
||||
|
@ -1954,7 +1952,7 @@ public class CapacityScheduler extends
|
|||
return this.schedulerHealth;
|
||||
}
|
||||
|
||||
private synchronized void setLastNodeUpdateTime(long time) {
|
||||
private void setLastNodeUpdateTime(long time) {
|
||||
this.lastNodeUpdateTime = time;
|
||||
}
|
||||
|
||||
|
|
|
@ -1078,9 +1078,33 @@ public class TestCapacityScheduler {
|
|||
RegisterApplicationMasterRequest.newInstance("localhost", 12345, "");
|
||||
client.registerApplicationMaster(request);
|
||||
|
||||
// Allocate a container
|
||||
List<ResourceRequest> asks = Collections.singletonList(
|
||||
ResourceRequest.newInstance(
|
||||
Priority.newInstance(1), "*", Resources.createResource(2 * GB), 1));
|
||||
AllocateRequest allocateRequest =
|
||||
AllocateRequest.newInstance(0, 0.0f, asks, null, null);
|
||||
client.allocate(allocateRequest);
|
||||
|
||||
// Make sure the container is allocated in RM
|
||||
nm1.nodeHeartbeat(true);
|
||||
ContainerId containerId2 =
|
||||
ContainerId.newContainerId(applicationAttemptId, 2);
|
||||
Assert.assertTrue(rm.waitForState(nm1, containerId2,
|
||||
RMContainerState.ALLOCATED, 10 * 1000));
|
||||
|
||||
// Acquire the container
|
||||
allocateRequest = AllocateRequest.newInstance(1, 0.0f, null, null, null);
|
||||
client.allocate(allocateRequest);
|
||||
|
||||
// Launch the container
|
||||
final CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
|
||||
RMContainer rmContainer = cs.getRMContainer(containerId2);
|
||||
rmContainer.handle(
|
||||
new RMContainerEvent(containerId2, RMContainerEventType.LAUNCHED));
|
||||
|
||||
// grab the scheduler lock from another thread
|
||||
// and verify an allocate call in this thread doesn't block on it
|
||||
final CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
|
||||
final CyclicBarrier barrier = new CyclicBarrier(2);
|
||||
Thread otherThread = new Thread(new Runnable() {
|
||||
@Override
|
||||
|
@ -1089,9 +1113,7 @@ public class TestCapacityScheduler {
|
|||
try {
|
||||
barrier.await();
|
||||
barrier.await();
|
||||
} catch (InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
} catch (BrokenBarrierException e) {
|
||||
} catch (InterruptedException | BrokenBarrierException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
|
@ -1099,8 +1121,9 @@ public class TestCapacityScheduler {
|
|||
});
|
||||
otherThread.start();
|
||||
barrier.await();
|
||||
AllocateRequest allocateRequest =
|
||||
AllocateRequest.newInstance(0, 0.0f, null, null, null);
|
||||
List<ContainerId> release = Collections.singletonList(containerId2);
|
||||
allocateRequest =
|
||||
AllocateRequest.newInstance(2, 0.0f, null, release, null);
|
||||
client.allocate(allocateRequest);
|
||||
barrier.await();
|
||||
otherThread.join();
|
||||
|
|
Loading…
Reference in New Issue