From 90f492e5328b459a259e965f4b790f0a22ea46dc Mon Sep 17 00:00:00 2001 From: Vinod Kumar Vavilapalli Date: Wed, 11 Sep 2013 00:39:21 +0000 Subject: [PATCH] MAPREDUCE-5497. Changed MRAppMaster to sleep only after doing everything else but just before ClientService to avoid race conditions during RM restart. Contributed by Jian He. svn merge --ignore-ancestry -c 1521699 ../../trunk/ git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1521700 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-mapreduce-project/CHANGES.txt | 4 + .../hadoop/mapreduce/v2/app/MRAppMaster.java | 26 ++-- .../v2/app/client/ClientService.java | 40 +++--- .../v2/app/client/MRClientService.java | 5 +- .../apache/hadoop/mapreduce/v2/app/MRApp.java | 3 +- .../app/TestMRAppComponentDependencies.java | 121 ++++++++++++++++++ .../mapreduce/v2/app/TestStagingCleanup.java | 33 +---- 7 files changed, 170 insertions(+), 62 deletions(-) create mode 100644 hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRAppComponentDependencies.java diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 81c237fb13b..e2d69cca043 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -49,6 +49,10 @@ Release 2.1.1-beta - UNRELEASED MAPREDUCE-5478. TeraInputFormat unnecessarily defines its own FileSplit subclass (Sandy Ryza) + MAPREDUCE-5497. Changed MRAppMaster to sleep only after doing everything else + but just before ClientService to avoid race conditions during RM restart. + (Jian He via vinodkv) + OPTIMIZATIONS MAPREDUCE-5446. TestJobHistoryEvents and TestJobHistoryParsing have race diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java index dd11ea6fb88..cba38c2bff5 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java @@ -362,7 +362,10 @@ public class MRAppMaster extends CompositeService { //service to handle requests from JobClient clientService = createClientService(context); - addIfService(clientService); + // Init ClientService separately so that we stop it separately, since this + // service needs to wait some time before it stops so clients can know the + // final states + clientService.init(conf); containerAllocator = createContainerAllocator(clientService, context); @@ -425,7 +428,6 @@ public class MRAppMaster extends CompositeService { // queued inside the JobHistoryEventHandler addIfService(historyService); } - super.serviceInit(conf); } // end of init() @@ -534,14 +536,6 @@ public class MRAppMaster extends CompositeService { } } - // TODO:currently just wait for some time so clients can know the - // final states. Will be removed once RM come on. - try { - Thread.sleep(5000); - } catch (InterruptedException e) { - e.printStackTrace(); - } - try { //if isLastAMRetry comes as true, should never set it to false if ( !isLastAMRetry){ @@ -556,6 +550,14 @@ public class MRAppMaster extends CompositeService { LOG.info("Calling stop for all the services"); MRAppMaster.this.stop(); + // TODO: Stop ClientService last, since only ClientService should wait for + // some time so clients can know the final states. Will be removed once RM come on. + try { + Thread.sleep(5000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + clientService.stop(); } catch (Throwable t) { LOG.warn("Graceful stop failed ", t); } @@ -1019,8 +1021,10 @@ public class MRAppMaster extends CompositeService { LOG.info("MRAppMaster launching normal, non-uberized, multi-container " + "job " + job.getID() + "."); } + // Start ClientService here, since it's not initialized if + // errorHappenedShutDown is true + clientService.start(); } - //start all the components super.serviceStart(); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/client/ClientService.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/client/ClientService.java index a4c0a0d14bd..4727ffd8710 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/client/ClientService.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/client/ClientService.java @@ -1,28 +1,30 @@ /** -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.hadoop.mapreduce.v2.app.client; import java.net.InetSocketAddress; -public interface ClientService { +import org.apache.hadoop.service.Service; - InetSocketAddress getBindAddress(); +public interface ClientService extends Service { - int getHttpPort(); + public abstract InetSocketAddress getBindAddress(); + + public abstract int getHttpPort(); } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/client/MRClientService.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/client/MRClientService.java index d36bf62fdf0..181fd3740a9 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/client/MRClientService.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/client/MRClientService.java @@ -94,8 +94,7 @@ import org.apache.hadoop.yarn.webapp.WebApps; * jobclient (user facing). * */ -public class MRClientService extends AbstractService - implements ClientService { +public class MRClientService extends AbstractService implements ClientService { static final Log LOG = LogFactory.getLog(MRClientService.class); @@ -106,7 +105,7 @@ public class MRClientService extends AbstractService private AppContext appContext; public MRClientService(AppContext appContext) { - super("MRClientService"); + super(MRClientService.class.getName()); this.appContext = appContext; this.protocolHandler = new MRClientProtocolHandler(); } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java index 6d1804f0627..76fd00ad848 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java @@ -55,6 +55,7 @@ import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState; import org.apache.hadoop.mapreduce.v2.api.records.TaskReport; import org.apache.hadoop.mapreduce.v2.api.records.TaskState; import org.apache.hadoop.mapreduce.v2.app.client.ClientService; +import org.apache.hadoop.mapreduce.v2.app.client.MRClientService; import org.apache.hadoop.mapreduce.v2.app.commit.CommitterEvent; import org.apache.hadoop.mapreduce.v2.app.commit.CommitterEventHandler; import org.apache.hadoop.mapreduce.v2.app.job.Job; @@ -603,7 +604,7 @@ public class MRApp extends MRAppMaster { @Override protected ClientService createClientService(AppContext context) { - return new ClientService(){ + return new MRClientService(context) { @Override public InetSocketAddress getBindAddress() { return NetUtils.createSocketAddr("localhost:9876"); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRAppComponentDependencies.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRAppComponentDependencies.java new file mode 100644 index 00000000000..52ee2c7017d --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRAppComponentDependencies.java @@ -0,0 +1,121 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapreduce.v2.app; + +import java.io.IOException; + +import junit.framework.Assert; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent; +import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEventHandler; +import org.apache.hadoop.mapreduce.v2.api.records.JobState; +import org.apache.hadoop.mapreduce.v2.app.client.ClientService; +import org.apache.hadoop.mapreduce.v2.app.client.MRClientService; +import org.apache.hadoop.mapreduce.v2.app.job.Job; +import org.apache.hadoop.mapreduce.v2.app.job.JobStateInternal; +import org.apache.hadoop.mapreduce.v2.app.job.event.JobFinishEvent; +import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.junit.Test; + +public class TestMRAppComponentDependencies { + + @Test(timeout = 20000) + public void testComponentStopOrder() throws Exception { + @SuppressWarnings("resource") + TestMRApp app = new TestMRApp(1, 1, true, this.getClass().getName(), true); + JobImpl job = (JobImpl) app.submit(new Configuration()); + app.waitForState(job, JobState.SUCCEEDED); + app.verifyCompleted(); + + int waitTime = 20 * 1000; + while (waitTime > 0 && app.numStops < 2) { + Thread.sleep(100); + waitTime -= 100; + } + + // assert JobHistoryEventHandlerStopped and then clientServiceStopped + Assert.assertEquals(1, app.JobHistoryEventHandlerStopped); + Assert.assertEquals(2, app.clientServiceStopped); + } + + private final class TestMRApp extends MRApp { + int JobHistoryEventHandlerStopped; + int clientServiceStopped; + int numStops; + + public TestMRApp(int maps, int reduces, boolean autoComplete, + String testName, boolean cleanOnStart) { + super(maps, reduces, autoComplete, testName, cleanOnStart); + JobHistoryEventHandlerStopped = 0; + clientServiceStopped = 0; + numStops = 0; + } + + @Override + protected Job createJob(Configuration conf, JobStateInternal forcedState, + String diagnostic) { + UserGroupInformation currentUser = null; + try { + currentUser = UserGroupInformation.getCurrentUser(); + } catch (IOException e) { + throw new YarnRuntimeException(e); + } + Job newJob = + new TestJob(getJobId(), getAttemptID(), conf, getDispatcher() + .getEventHandler(), getTaskAttemptListener(), getContext() + .getClock(), getCommitter(), isNewApiCommitter(), + currentUser.getUserName(), getContext(), forcedState, diagnostic); + ((AppContext) getContext()).getAllJobs().put(newJob.getID(), newJob); + + getDispatcher().register(JobFinishEvent.Type.class, + createJobFinishEventHandler()); + + return newJob; + } + + @Override + protected ClientService createClientService(AppContext context) { + return new MRClientService(context) { + @Override + public void serviceStop() throws Exception { + numStops++; + clientServiceStopped = numStops; + super.serviceStop(); + } + }; + } + + @Override + protected EventHandler createJobHistoryHandler( + AppContext context) { + return new JobHistoryEventHandler(context, getStartCount()) { + @Override + public void serviceStop() throws Exception { + numStops++; + JobHistoryEventHandlerStopped = numStops; + super.serviceStop(); + } + }; + } + } +} diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java index a0c0cb6c35f..496c1e35068 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java @@ -41,6 +41,7 @@ import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEventHandler; import org.apache.hadoop.mapreduce.v2.api.records.JobId; import org.apache.hadoop.mapreduce.v2.api.records.JobState; import org.apache.hadoop.mapreduce.v2.app.client.ClientService; +import org.apache.hadoop.mapreduce.v2.app.client.MRClientService; import org.apache.hadoop.mapreduce.v2.app.job.Job; import org.apache.hadoop.mapreduce.v2.app.job.JobStateInternal; import org.apache.hadoop.mapreduce.v2.app.job.event.JobFinishEvent; @@ -284,14 +285,12 @@ import org.junit.Test; private final class MRAppTestCleanup extends MRApp { int stagingDirCleanedup; int ContainerAllocatorStopped; - int JobHistoryEventHandlerStopped; int numStops; public MRAppTestCleanup(int maps, int reduces, boolean autoComplete, String testName, boolean cleanOnStart) { super(maps, reduces, autoComplete, testName, cleanOnStart); stagingDirCleanedup = 0; ContainerAllocatorStopped = 0; - JobHistoryEventHandlerStopped = 0; numStops = 0; } @@ -318,26 +317,6 @@ import org.junit.Test; return newJob; } - @Override - protected EventHandler createJobHistoryHandler( - AppContext context) { - return new TestJobHistoryEventHandler(context, getStartCount()); - } - - private class TestJobHistoryEventHandler extends JobHistoryEventHandler { - - public TestJobHistoryEventHandler(AppContext context, int startCount) { - super(context, startCount); - } - - @Override - public void serviceStop() throws Exception { - numStops++; - JobHistoryEventHandlerStopped = numStops; - super.serviceStop(); - } - } - @Override protected ContainerAllocator createContainerAllocator( ClientService clientService, AppContext context) { @@ -405,15 +384,13 @@ import org.junit.Test; app.verifyCompleted(); int waitTime = 20 * 1000; - while (waitTime > 0 && app.numStops < 3 ) { + while (waitTime > 0 && app.numStops < 2) { Thread.sleep(100); waitTime -= 100; } - // assert JobHistoryEventHandlerStopped first, then - // ContainerAllocatorStopped, and then stagingDirCleanedup - Assert.assertEquals(1, app.JobHistoryEventHandlerStopped); - Assert.assertEquals(2, app.ContainerAllocatorStopped); - Assert.assertEquals(3, app.stagingDirCleanedup); + // assert ContainerAllocatorStopped and then tagingDirCleanedup + Assert.assertEquals(1, app.ContainerAllocatorStopped); + Assert.assertEquals(2, app.stagingDirCleanedup); } }