YARN-4671. There is no need to acquire CS lock when completing a container. Contributed by Meng Ding
This commit is contained in:
parent
a823c30a9b
commit
023c2d2e56
|
@ -1424,6 +1424,9 @@ Release 2.8.0 - UNRELEASED
|
||||||
YARN-4748. ApplicationHistoryManagerOnTimelineStore should not
|
YARN-4748. ApplicationHistoryManagerOnTimelineStore should not
|
||||||
swallow exceptions on generateApplicationReport. (Li Lu via jianhe)
|
swallow exceptions on generateApplicationReport. (Li Lu via jianhe)
|
||||||
|
|
||||||
|
YARN-4671. There is no need to acquire CS lock when completing a container.
|
||||||
|
(Meng Ding via jianhe)
|
||||||
|
|
||||||
Release 2.7.3 - UNRELEASED
|
Release 2.7.3 - UNRELEASED
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
|
|
@ -228,7 +228,7 @@ public class CapacityScheduler extends
|
||||||
private AsyncScheduleThread asyncSchedulerThread;
|
private AsyncScheduleThread asyncSchedulerThread;
|
||||||
private RMNodeLabelsManager labelManager;
|
private RMNodeLabelsManager labelManager;
|
||||||
private SchedulerHealth schedulerHealth = new SchedulerHealth();
|
private SchedulerHealth schedulerHealth = new SchedulerHealth();
|
||||||
long lastNodeUpdateTime;
|
volatile long lastNodeUpdateTime;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* EXPERT
|
* EXPERT
|
||||||
|
@ -928,7 +928,6 @@ public class CapacityScheduler extends
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
// Note: when AM asks to release container, we will acquire scheduler lock
|
|
||||||
@Lock(Lock.NoLock.class)
|
@Lock(Lock.NoLock.class)
|
||||||
public Allocation allocate(ApplicationAttemptId applicationAttemptId,
|
public Allocation allocate(ApplicationAttemptId applicationAttemptId,
|
||||||
List<ResourceRequest> ask, List<ContainerId> release,
|
List<ResourceRequest> ask, List<ContainerId> release,
|
||||||
|
@ -1546,9 +1545,8 @@ public class CapacityScheduler extends
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Lock(CapacityScheduler.class)
|
|
||||||
@Override
|
@Override
|
||||||
protected synchronized void completedContainerInternal(
|
protected void completedContainerInternal(
|
||||||
RMContainer rmContainer, ContainerStatus containerStatus,
|
RMContainer rmContainer, ContainerStatus containerStatus,
|
||||||
RMContainerEventType event) {
|
RMContainerEventType event) {
|
||||||
|
|
||||||
|
@ -1957,7 +1955,7 @@ public class CapacityScheduler extends
|
||||||
return this.schedulerHealth;
|
return this.schedulerHealth;
|
||||||
}
|
}
|
||||||
|
|
||||||
private synchronized void setLastNodeUpdateTime(long time) {
|
private void setLastNodeUpdateTime(long time) {
|
||||||
this.lastNodeUpdateTime = time;
|
this.lastNodeUpdateTime = time;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1078,9 +1078,33 @@ public class TestCapacityScheduler {
|
||||||
RegisterApplicationMasterRequest.newInstance("localhost", 12345, "");
|
RegisterApplicationMasterRequest.newInstance("localhost", 12345, "");
|
||||||
client.registerApplicationMaster(request);
|
client.registerApplicationMaster(request);
|
||||||
|
|
||||||
|
// Allocate a container
|
||||||
|
List<ResourceRequest> asks = Collections.singletonList(
|
||||||
|
ResourceRequest.newInstance(
|
||||||
|
Priority.newInstance(1), "*", Resources.createResource(2 * GB), 1));
|
||||||
|
AllocateRequest allocateRequest =
|
||||||
|
AllocateRequest.newInstance(0, 0.0f, asks, null, null);
|
||||||
|
client.allocate(allocateRequest);
|
||||||
|
|
||||||
|
// Make sure the container is allocated in RM
|
||||||
|
nm1.nodeHeartbeat(true);
|
||||||
|
ContainerId containerId2 =
|
||||||
|
ContainerId.newContainerId(applicationAttemptId, 2);
|
||||||
|
Assert.assertTrue(rm.waitForState(nm1, containerId2,
|
||||||
|
RMContainerState.ALLOCATED, 10 * 1000));
|
||||||
|
|
||||||
|
// Acquire the container
|
||||||
|
allocateRequest = AllocateRequest.newInstance(1, 0.0f, null, null, null);
|
||||||
|
client.allocate(allocateRequest);
|
||||||
|
|
||||||
|
// Launch the container
|
||||||
|
final CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
|
||||||
|
RMContainer rmContainer = cs.getRMContainer(containerId2);
|
||||||
|
rmContainer.handle(
|
||||||
|
new RMContainerEvent(containerId2, RMContainerEventType.LAUNCHED));
|
||||||
|
|
||||||
// grab the scheduler lock from another thread
|
// grab the scheduler lock from another thread
|
||||||
// and verify an allocate call in this thread doesn't block on it
|
// and verify an allocate call in this thread doesn't block on it
|
||||||
final CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
|
|
||||||
final CyclicBarrier barrier = new CyclicBarrier(2);
|
final CyclicBarrier barrier = new CyclicBarrier(2);
|
||||||
Thread otherThread = new Thread(new Runnable() {
|
Thread otherThread = new Thread(new Runnable() {
|
||||||
@Override
|
@Override
|
||||||
|
@ -1089,9 +1113,7 @@ public class TestCapacityScheduler {
|
||||||
try {
|
try {
|
||||||
barrier.await();
|
barrier.await();
|
||||||
barrier.await();
|
barrier.await();
|
||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException | BrokenBarrierException e) {
|
||||||
e.printStackTrace();
|
|
||||||
} catch (BrokenBarrierException e) {
|
|
||||||
e.printStackTrace();
|
e.printStackTrace();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1099,8 +1121,9 @@ public class TestCapacityScheduler {
|
||||||
});
|
});
|
||||||
otherThread.start();
|
otherThread.start();
|
||||||
barrier.await();
|
barrier.await();
|
||||||
AllocateRequest allocateRequest =
|
List<ContainerId> release = Collections.singletonList(containerId2);
|
||||||
AllocateRequest.newInstance(0, 0.0f, null, null, null);
|
allocateRequest =
|
||||||
|
AllocateRequest.newInstance(2, 0.0f, null, release, null);
|
||||||
client.allocate(allocateRequest);
|
client.allocate(allocateRequest);
|
||||||
barrier.await();
|
barrier.await();
|
||||||
otherThread.join();
|
otherThread.join();
|
||||||
|
|
Loading…
Reference in New Issue