MAPREDUCE-5497. Changed MRAppMaster to sleep only after doing everything else but just before ClientService to avoid race conditions during RM restart. Contributed by Jian He.

svn merge --ignore-ancestry -c 1521699 ../../trunk/


git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1521700 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Vinod Kumar Vavilapalli 2013-09-11 00:39:21 +00:00
parent 4a8ac4d59d
commit 90f492e532
7 changed files with 170 additions and 62 deletions

View File

@ -49,6 +49,10 @@ Release 2.1.1-beta - UNRELEASED
MAPREDUCE-5478. TeraInputFormat unnecessarily defines its own FileSplit
subclass (Sandy Ryza)
MAPREDUCE-5497. Changed MRAppMaster to sleep only after doing everything else
but just before ClientService to avoid race conditions during RM restart.
(Jian He via vinodkv)
OPTIMIZATIONS
MAPREDUCE-5446. TestJobHistoryEvents and TestJobHistoryParsing have race

View File

@ -362,7 +362,10 @@ protected void serviceInit(final Configuration conf) throws Exception {
//service to handle requests from JobClient
clientService = createClientService(context);
addIfService(clientService);
// Init ClientService separately so that we stop it separately, since this
// service needs to wait some time before it stops so clients can know the
// final states
clientService.init(conf);
containerAllocator = createContainerAllocator(clientService, context);
@ -425,7 +428,6 @@ protected void serviceInit(final Configuration conf) throws Exception {
// queued inside the JobHistoryEventHandler
addIfService(historyService);
}
super.serviceInit(conf);
} // end of init()
@ -534,14 +536,6 @@ public void shutDownJob() {
}
}
// TODO:currently just wait for some time so clients can know the
// final states. Will be removed once RM come on.
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
try {
//if isLastAMRetry comes as true, should never set it to false
if ( !isLastAMRetry){
@ -556,6 +550,14 @@ public void shutDownJob() {
LOG.info("Calling stop for all the services");
MRAppMaster.this.stop();
// TODO: Stop ClientService last, since only ClientService should wait for
// some time so clients can know the final states. Will be removed once RM come on.
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
clientService.stop();
} catch (Throwable t) {
LOG.warn("Graceful stop failed ", t);
}
@ -1019,8 +1021,10 @@ protected void serviceStart() throws Exception {
LOG.info("MRAppMaster launching normal, non-uberized, multi-container "
+ "job " + job.getID() + ".");
}
// Start ClientService here, since it's not initialized if
// errorHappenedShutDown is true
clientService.start();
}
//start all the components
super.serviceStart();

View File

@ -1,28 +1,30 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce.v2.app.client;
import java.net.InetSocketAddress;
public interface ClientService {
import org.apache.hadoop.service.Service;
InetSocketAddress getBindAddress();
public interface ClientService extends Service {
int getHttpPort();
public abstract InetSocketAddress getBindAddress();
public abstract int getHttpPort();
}

View File

@ -94,8 +94,7 @@
* jobclient (user facing).
*
*/
public class MRClientService extends AbstractService
implements ClientService {
public class MRClientService extends AbstractService implements ClientService {
static final Log LOG = LogFactory.getLog(MRClientService.class);
@ -106,7 +105,7 @@ public class MRClientService extends AbstractService
private AppContext appContext;
public MRClientService(AppContext appContext) {
super("MRClientService");
super(MRClientService.class.getName());
this.appContext = appContext;
this.protocolHandler = new MRClientProtocolHandler();
}

View File

@ -55,6 +55,7 @@
import org.apache.hadoop.mapreduce.v2.api.records.TaskReport;
import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
import org.apache.hadoop.mapreduce.v2.app.client.MRClientService;
import org.apache.hadoop.mapreduce.v2.app.commit.CommitterEvent;
import org.apache.hadoop.mapreduce.v2.app.commit.CommitterEventHandler;
import org.apache.hadoop.mapreduce.v2.app.job.Job;
@ -603,7 +604,7 @@ public void recoverTask(TaskAttemptContext taskContext)
@Override
protected ClientService createClientService(AppContext context) {
return new ClientService(){
return new MRClientService(context) {
@Override
public InetSocketAddress getBindAddress() {
return NetUtils.createSocketAddr("localhost:9876");

View File

@ -0,0 +1,121 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce.v2.app;
import java.io.IOException;
import junit.framework.Assert;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEventHandler;
import org.apache.hadoop.mapreduce.v2.api.records.JobState;
import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
import org.apache.hadoop.mapreduce.v2.app.client.MRClientService;
import org.apache.hadoop.mapreduce.v2.app.job.Job;
import org.apache.hadoop.mapreduce.v2.app.job.JobStateInternal;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobFinishEvent;
import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.junit.Test;
public class TestMRAppComponentDependencies {
@Test(timeout = 20000)
public void testComponentStopOrder() throws Exception {
@SuppressWarnings("resource")
TestMRApp app = new TestMRApp(1, 1, true, this.getClass().getName(), true);
JobImpl job = (JobImpl) app.submit(new Configuration());
app.waitForState(job, JobState.SUCCEEDED);
app.verifyCompleted();
int waitTime = 20 * 1000;
while (waitTime > 0 && app.numStops < 2) {
Thread.sleep(100);
waitTime -= 100;
}
// assert JobHistoryEventHandlerStopped and then clientServiceStopped
Assert.assertEquals(1, app.JobHistoryEventHandlerStopped);
Assert.assertEquals(2, app.clientServiceStopped);
}
private final class TestMRApp extends MRApp {
int JobHistoryEventHandlerStopped;
int clientServiceStopped;
int numStops;
public TestMRApp(int maps, int reduces, boolean autoComplete,
String testName, boolean cleanOnStart) {
super(maps, reduces, autoComplete, testName, cleanOnStart);
JobHistoryEventHandlerStopped = 0;
clientServiceStopped = 0;
numStops = 0;
}
@Override
protected Job createJob(Configuration conf, JobStateInternal forcedState,
String diagnostic) {
UserGroupInformation currentUser = null;
try {
currentUser = UserGroupInformation.getCurrentUser();
} catch (IOException e) {
throw new YarnRuntimeException(e);
}
Job newJob =
new TestJob(getJobId(), getAttemptID(), conf, getDispatcher()
.getEventHandler(), getTaskAttemptListener(), getContext()
.getClock(), getCommitter(), isNewApiCommitter(),
currentUser.getUserName(), getContext(), forcedState, diagnostic);
((AppContext) getContext()).getAllJobs().put(newJob.getID(), newJob);
getDispatcher().register(JobFinishEvent.Type.class,
createJobFinishEventHandler());
return newJob;
}
@Override
protected ClientService createClientService(AppContext context) {
return new MRClientService(context) {
@Override
public void serviceStop() throws Exception {
numStops++;
clientServiceStopped = numStops;
super.serviceStop();
}
};
}
@Override
protected EventHandler<JobHistoryEvent> createJobHistoryHandler(
AppContext context) {
return new JobHistoryEventHandler(context, getStartCount()) {
@Override
public void serviceStop() throws Exception {
numStops++;
JobHistoryEventHandlerStopped = numStops;
super.serviceStop();
}
};
}
}
}

View File

@ -41,6 +41,7 @@
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.JobState;
import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
import org.apache.hadoop.mapreduce.v2.app.client.MRClientService;
import org.apache.hadoop.mapreduce.v2.app.job.Job;
import org.apache.hadoop.mapreduce.v2.app.job.JobStateInternal;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobFinishEvent;
@ -284,14 +285,12 @@ public boolean getTestIsLastAMRetry(){
private final class MRAppTestCleanup extends MRApp {
int stagingDirCleanedup;
int ContainerAllocatorStopped;
int JobHistoryEventHandlerStopped;
int numStops;
public MRAppTestCleanup(int maps, int reduces, boolean autoComplete,
String testName, boolean cleanOnStart) {
super(maps, reduces, autoComplete, testName, cleanOnStart);
stagingDirCleanedup = 0;
ContainerAllocatorStopped = 0;
JobHistoryEventHandlerStopped = 0;
numStops = 0;
}
@ -318,26 +317,6 @@ protected Job createJob(Configuration conf, JobStateInternal forcedState,
return newJob;
}
@Override
protected EventHandler<JobHistoryEvent> createJobHistoryHandler(
AppContext context) {
return new TestJobHistoryEventHandler(context, getStartCount());
}
private class TestJobHistoryEventHandler extends JobHistoryEventHandler {
public TestJobHistoryEventHandler(AppContext context, int startCount) {
super(context, startCount);
}
@Override
public void serviceStop() throws Exception {
numStops++;
JobHistoryEventHandlerStopped = numStops;
super.serviceStop();
}
}
@Override
protected ContainerAllocator createContainerAllocator(
ClientService clientService, AppContext context) {
@ -405,15 +384,13 @@ public void testStagingCleanupOrder() throws Exception {
app.verifyCompleted();
int waitTime = 20 * 1000;
while (waitTime > 0 && app.numStops < 3 ) {
while (waitTime > 0 && app.numStops < 2) {
Thread.sleep(100);
waitTime -= 100;
}
// assert JobHistoryEventHandlerStopped first, then
// ContainerAllocatorStopped, and then stagingDirCleanedup
Assert.assertEquals(1, app.JobHistoryEventHandlerStopped);
Assert.assertEquals(2, app.ContainerAllocatorStopped);
Assert.assertEquals(3, app.stagingDirCleanedup);
// assert ContainerAllocatorStopped and then tagingDirCleanedup
Assert.assertEquals(1, app.ContainerAllocatorStopped);
Assert.assertEquals(2, app.stagingDirCleanedup);
}
}