YARN-2171. Improved CapacityScheduling to not lock on nodemanager-count when AMs heartbeat in. Contributed by Jason Lowe.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1605616 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
e3612e4428
commit
9571db19eb
|
@ -192,6 +192,9 @@ Release 2.5.0 - UNRELEASED
|
|||
YARN-2152. Added missing information into ContainerTokenIdentifier so that
|
||||
NodeManagers can report the same to RM when RM restarts. (Jian He via vinodkv)
|
||||
|
||||
YARN-2171. Improved CapacityScheduling to not lock on nodemanager-count when
|
||||
AMs heartbeat in. (Jason Lowe via vinodkv)
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
BUG FIXES
|
||||
|
|
|
@ -19,6 +19,7 @@
|
|||
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.util.ArrayList;
|
||||
|
@ -30,6 +31,7 @@ import java.util.Map;
|
|||
import java.util.Random;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
@ -180,7 +182,7 @@ public class CapacityScheduler extends
|
|||
|
||||
private Map<String, CSQueue> queues = new ConcurrentHashMap<String, CSQueue>();
|
||||
|
||||
private int numNodeManagers = 0;
|
||||
private AtomicInteger numNodeManagers = new AtomicInteger(0);
|
||||
|
||||
private ResourceCalculator calculator;
|
||||
private boolean usePortForNodeName;
|
||||
|
@ -236,8 +238,8 @@ public class CapacityScheduler extends
|
|||
}
|
||||
|
||||
@Override
|
||||
public synchronized int getNumClusterNodes() {
|
||||
return numNodeManagers;
|
||||
public int getNumClusterNodes() {
|
||||
return numNodeManagers.get();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -953,11 +955,11 @@ public class CapacityScheduler extends
|
|||
usePortForNodeName));
|
||||
Resources.addTo(clusterResource, nodeManager.getTotalCapability());
|
||||
root.updateClusterResource(clusterResource);
|
||||
++numNodeManagers;
|
||||
int numNodes = numNodeManagers.incrementAndGet();
|
||||
LOG.info("Added node " + nodeManager.getNodeAddress() +
|
||||
" clusterResource: " + clusterResource);
|
||||
|
||||
if (scheduleAsynchronously && numNodeManagers == 1) {
|
||||
if (scheduleAsynchronously && numNodes == 1) {
|
||||
asyncSchedulerThread.beginSchedule();
|
||||
}
|
||||
}
|
||||
|
@ -969,9 +971,9 @@ public class CapacityScheduler extends
|
|||
}
|
||||
Resources.subtractFrom(clusterResource, node.getRMNode().getTotalCapability());
|
||||
root.updateClusterResource(clusterResource);
|
||||
--numNodeManagers;
|
||||
int numNodes = numNodeManagers.decrementAndGet();
|
||||
|
||||
if (scheduleAsynchronously && numNodeManagers == 0) {
|
||||
if (scheduleAsynchronously && numNodes == 0) {
|
||||
asyncSchedulerThread.suspendSchedule();
|
||||
}
|
||||
|
||||
|
|
|
@ -25,15 +25,29 @@ import static org.mockito.Mockito.mock;
|
|||
import static org.mockito.Mockito.when;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.security.PrivilegedAction;
|
||||
import java.util.Collections;
|
||||
import java.util.Comparator;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.BrokenBarrierException;
|
||||
import java.util.concurrent.CyclicBarrier;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.net.NetworkTopology;
|
||||
import org.apache.hadoop.security.Credentials;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
import org.apache.hadoop.security.token.TokenIdentifier;
|
||||
import org.apache.hadoop.yarn.LocalConfigurationProvider;
|
||||
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
|
@ -46,13 +60,20 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
|||
import org.apache.hadoop.yarn.event.AsyncDispatcher;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||
import org.apache.hadoop.yarn.ipc.YarnRPC;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.Application;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.Task;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.TestAMAuthorization.MockRMWithAMS;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.TestAMAuthorization.MyContainerManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
||||
|
@ -686,4 +707,125 @@ public class TestCapacityScheduler {
|
|||
}
|
||||
}
|
||||
|
||||
@Test(timeout = 30000)
|
||||
public void testAllocateDoesNotBlockOnSchedulerLock() throws Exception {
|
||||
final YarnConfiguration conf = new YarnConfiguration();
|
||||
conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
|
||||
ResourceScheduler.class);
|
||||
MyContainerManager containerManager = new MyContainerManager();
|
||||
final MockRMWithAMS rm =
|
||||
new MockRMWithAMS(conf, containerManager);
|
||||
rm.start();
|
||||
|
||||
MockNM nm1 = rm.registerNode("localhost:1234", 5120);
|
||||
|
||||
Map<ApplicationAccessType, String> acls =
|
||||
new HashMap<ApplicationAccessType, String>(2);
|
||||
acls.put(ApplicationAccessType.VIEW_APP, "*");
|
||||
RMApp app = rm.submitApp(1024, "appname", "appuser", acls);
|
||||
|
||||
nm1.nodeHeartbeat(true);
|
||||
|
||||
RMAppAttempt attempt = app.getCurrentAppAttempt();
|
||||
ApplicationAttemptId applicationAttemptId = attempt.getAppAttemptId();
|
||||
int msecToWait = 10000;
|
||||
int msecToSleep = 100;
|
||||
while (attempt.getAppAttemptState() != RMAppAttemptState.LAUNCHED
|
||||
&& msecToWait > 0) {
|
||||
LOG.info("Waiting for AppAttempt to reach LAUNCHED state. "
|
||||
+ "Current state is " + attempt.getAppAttemptState());
|
||||
Thread.sleep(msecToSleep);
|
||||
msecToWait -= msecToSleep;
|
||||
}
|
||||
Assert.assertEquals(attempt.getAppAttemptState(),
|
||||
RMAppAttemptState.LAUNCHED);
|
||||
|
||||
// Create a client to the RM.
|
||||
final YarnRPC rpc = YarnRPC.create(conf);
|
||||
|
||||
UserGroupInformation currentUser =
|
||||
UserGroupInformation.createRemoteUser(applicationAttemptId.toString());
|
||||
Credentials credentials = containerManager.getContainerCredentials();
|
||||
final InetSocketAddress rmBindAddress =
|
||||
rm.getApplicationMasterService().getBindAddress();
|
||||
Token<? extends TokenIdentifier> amRMToken =
|
||||
MockRMWithAMS.setupAndReturnAMRMToken(rmBindAddress,
|
||||
credentials.getAllTokens());
|
||||
currentUser.addToken(amRMToken);
|
||||
ApplicationMasterProtocol client =
|
||||
currentUser.doAs(new PrivilegedAction<ApplicationMasterProtocol>() {
|
||||
@Override
|
||||
public ApplicationMasterProtocol run() {
|
||||
return (ApplicationMasterProtocol) rpc.getProxy(
|
||||
ApplicationMasterProtocol.class, rmBindAddress, conf);
|
||||
}
|
||||
});
|
||||
|
||||
RegisterApplicationMasterRequest request =
|
||||
RegisterApplicationMasterRequest.newInstance("localhost", 12345, "");
|
||||
client.registerApplicationMaster(request);
|
||||
|
||||
// grab the scheduler lock from another thread
|
||||
// and verify an allocate call in this thread doesn't block on it
|
||||
final CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
|
||||
final CyclicBarrier barrier = new CyclicBarrier(2);
|
||||
Thread otherThread = new Thread(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
synchronized(cs) {
|
||||
try {
|
||||
barrier.await();
|
||||
barrier.await();
|
||||
} catch (InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
} catch (BrokenBarrierException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
otherThread.start();
|
||||
barrier.await();
|
||||
AllocateRequest allocateRequest =
|
||||
AllocateRequest.newInstance(0, 0.0f, null, null, null);
|
||||
client.allocate(allocateRequest);
|
||||
barrier.await();
|
||||
otherThread.join();
|
||||
|
||||
rm.stop();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNumClusterNodes() throws Exception {
|
||||
YarnConfiguration conf = new YarnConfiguration();
|
||||
CapacityScheduler cs = new CapacityScheduler();
|
||||
cs.setConf(conf);
|
||||
RMContextImpl rmContext = new RMContextImpl(null, null, null, null, null,
|
||||
null, new RMContainerTokenSecretManager(conf),
|
||||
new NMTokenSecretManagerInRM(conf),
|
||||
new ClientToAMTokenSecretManagerInRM(), null);
|
||||
cs.setRMContext(rmContext);
|
||||
CapacitySchedulerConfiguration csConf =
|
||||
new CapacitySchedulerConfiguration();
|
||||
setupQueueConfiguration(csConf);
|
||||
cs.init(csConf);
|
||||
cs.start();
|
||||
assertEquals(0, cs.getNumClusterNodes());
|
||||
|
||||
RMNode n1 = MockNodes.newNodeInfo(0, MockNodes.newResource(4 * GB), 1);
|
||||
RMNode n2 = MockNodes.newNodeInfo(0, MockNodes.newResource(2 * GB), 2);
|
||||
cs.handle(new NodeAddedSchedulerEvent(n1));
|
||||
cs.handle(new NodeAddedSchedulerEvent(n2));
|
||||
assertEquals(2, cs.getNumClusterNodes());
|
||||
|
||||
cs.handle(new NodeRemovedSchedulerEvent(n1));
|
||||
assertEquals(1, cs.getNumClusterNodes());
|
||||
cs.handle(new NodeAddedSchedulerEvent(n1));
|
||||
assertEquals(2, cs.getNumClusterNodes());
|
||||
cs.handle(new NodeRemovedSchedulerEvent(n2));
|
||||
cs.handle(new NodeRemovedSchedulerEvent(n1));
|
||||
assertEquals(0, cs.getNumClusterNodes());
|
||||
|
||||
cs.stop();
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue