YARN-357. App submission should not be synchronized (daryn)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1443016 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Daryn Sharp 2013-02-06 15:26:30 +00:00
parent c52a277687
commit d272918426
3 changed files with 98 additions and 3 deletions

View File

@ -300,6 +300,8 @@ Release 0.23.7 - UNRELEASED
OPTIMIZATIONS
YARN-357. App submission should not be synchronized (daryn)
BUG FIXES
YARN-343. Capacity Scheduler maximum-capacity value -1 is invalid (Xuan

View File

@ -228,7 +228,7 @@ public class RMAppManager implements EventHandler<RMAppManagerEvent>,
}
@SuppressWarnings("unchecked")
protected synchronized void submitApplication(
protected void submitApplication(
ApplicationSubmissionContext submissionContext, long submitTime) {
ApplicationId applicationId = submissionContext.getApplicationId();
RMApp application = null;

View File

@ -27,7 +27,9 @@ import java.io.IOException;
import java.net.InetSocketAddress;
import java.security.PrivilegedExceptionAction;
import java.util.List;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CyclicBarrier;
import junit.framework.Assert;
@ -37,6 +39,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.yarn.MockApps;
import org.apache.hadoop.yarn.api.ClientRMProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse;
@ -44,28 +47,36 @@ import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoResponse;
import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenRequest;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.DelegationToken;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.QueueInfo;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
import org.apache.hadoop.yarn.server.RMDelegationTokenSecretManager;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
import org.apache.hadoop.yarn.util.BuilderUtils;
import org.apache.hadoop.yarn.util.Records;
import org.junit.Test;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
public class TestClientRMService {
@ -235,6 +246,88 @@ public class TestClientRMService {
rmService.renewDelegationToken(request);
}
@Test(timeout=4000)
public void testConcurrentAppSubmit()
throws IOException, InterruptedException, BrokenBarrierException {
YarnScheduler yarnScheduler = mock(YarnScheduler.class);
RMContext rmContext = mock(RMContext.class);
mockRMContext(yarnScheduler, rmContext);
RMStateStore stateStore = mock(RMStateStore.class);
when(rmContext.getStateStore()).thenReturn(stateStore);
RMAppManager appManager = new RMAppManager(rmContext, yarnScheduler,
null, mock(ApplicationACLsManager.class), new Configuration());
final ApplicationId appId1 = getApplicationId(100);
final ApplicationId appId2 = getApplicationId(101);
final SubmitApplicationRequest submitRequest1 = mockSubmitAppRequest(appId1);
final SubmitApplicationRequest submitRequest2 = mockSubmitAppRequest(appId2);
final CyclicBarrier startBarrier = new CyclicBarrier(2);
final CyclicBarrier endBarrier = new CyclicBarrier(2);
@SuppressWarnings("rawtypes")
EventHandler eventHandler = new EventHandler() {
@Override
public void handle(Event rawEvent) {
if (rawEvent instanceof RMAppEvent) {
RMAppEvent event = (RMAppEvent) rawEvent;
if (event.getApplicationId().equals(appId1)) {
try {
startBarrier.await();
endBarrier.await();
} catch (BrokenBarrierException e) {
LOG.warn("Broken Barrier", e);
} catch (InterruptedException e) {
LOG.warn("Interrupted while awaiting barriers", e);
}
}
}
}
};
when(rmContext.getDispatcher().getEventHandler()).thenReturn(eventHandler);
final ClientRMService rmService =
new ClientRMService(rmContext, yarnScheduler, appManager, null, null);
// submit an app and wait for it to block while in app submission
Thread t = new Thread() {
@Override
public void run() {
try {
rmService.submitApplication(submitRequest1);
} catch (YarnRemoteException e) {}
}
};
t.start();
// submit another app, so go through while the first app is blocked
startBarrier.await();
rmService.submitApplication(submitRequest2);
endBarrier.await();
t.join();
}
private SubmitApplicationRequest mockSubmitAppRequest(ApplicationId appId) {
String user = MockApps.newUserName();
String queue = MockApps.newQueue();
ContainerLaunchContext amContainerSpec = mock(ContainerLaunchContext.class);
Resource resource = mock(Resource.class);
when(amContainerSpec.getResource()).thenReturn(resource);
ApplicationSubmissionContext submissionContext = mock(ApplicationSubmissionContext.class);
when(submissionContext.getUser()).thenReturn(user);
when(submissionContext.getQueue()).thenReturn(queue);
when(submissionContext.getAMContainerSpec()).thenReturn(amContainerSpec);
when(submissionContext.getApplicationId()).thenReturn(appId);
SubmitApplicationRequest submitRequest =
recordFactory.newRecordInstance(SubmitApplicationRequest.class);
submitRequest.setApplicationSubmissionContext(submissionContext);
return submitRequest;
}
private void mockRMContext(YarnScheduler yarnScheduler, RMContext rmContext)
throws IOException {
Dispatcher dispatcher = mock(Dispatcher.class);