merge MAPREDUCE-4099 amendment from trunk. ApplicationMaster will remove staging directory after the history service is stopped. (Contributed by Jason Lowe)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1324868 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
fac3cadf08
commit
50c13a44af
|
@ -211,6 +211,9 @@ Release 0.23.3 - UNRELEASED
|
||||||
MAPREDUCE-4040. History links should use hostname rather than IP address.
|
MAPREDUCE-4040. History links should use hostname rather than IP address.
|
||||||
(Bhallamudi Venkata Siva Kamesh via sseth)
|
(Bhallamudi Venkata Siva Kamesh via sseth)
|
||||||
|
|
||||||
|
MAPREDUCE-4099 amendment. ApplicationMaster will remove staging directory
|
||||||
|
after the history service is stopped. (Jason Lowe via sseth)
|
||||||
|
|
||||||
Release 0.23.2 - UNRELEASED
|
Release 0.23.2 - UNRELEASED
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
|
|
@ -285,6 +285,11 @@ public class MRAppMaster extends CompositeService {
|
||||||
addIfService(containerLauncher);
|
addIfService(containerLauncher);
|
||||||
dispatcher.register(ContainerLauncher.EventType.class, containerLauncher);
|
dispatcher.register(ContainerLauncher.EventType.class, containerLauncher);
|
||||||
|
|
||||||
|
// Add the staging directory cleaner before the history server but after
|
||||||
|
// the container allocator so the staging directory is cleaned after
|
||||||
|
// the history has been flushed but before unregistering with the RM.
|
||||||
|
addService(createStagingDirCleaningService());
|
||||||
|
|
||||||
// Add the JobHistoryEventHandler last so that it is properly stopped first.
|
// Add the JobHistoryEventHandler last so that it is properly stopped first.
|
||||||
// This will guarantee that all history-events are flushed before AM goes
|
// This will guarantee that all history-events are flushed before AM goes
|
||||||
// ahead with shutdown.
|
// ahead with shutdown.
|
||||||
|
@ -406,13 +411,6 @@ public class MRAppMaster extends CompositeService {
|
||||||
e.printStackTrace();
|
e.printStackTrace();
|
||||||
}
|
}
|
||||||
|
|
||||||
// Cleanup staging directory
|
|
||||||
try {
|
|
||||||
cleanupStagingDir();
|
|
||||||
} catch(IOException io) {
|
|
||||||
LOG.warn("Failed to delete staging dir", io);
|
|
||||||
}
|
|
||||||
|
|
||||||
try {
|
try {
|
||||||
// Stop all services
|
// Stop all services
|
||||||
// This will also send the final report to the ResourceManager
|
// This will also send the final report to the ResourceManager
|
||||||
|
@ -512,6 +510,10 @@ public class MRAppMaster extends CompositeService {
|
||||||
return this.jobHistoryEventHandler;
|
return this.jobHistoryEventHandler;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected AbstractService createStagingDirCleaningService() {
|
||||||
|
return new StagingDirCleaningService();
|
||||||
|
}
|
||||||
|
|
||||||
protected Speculator createSpeculator(Configuration conf, AppContext context) {
|
protected Speculator createSpeculator(Configuration conf, AppContext context) {
|
||||||
Class<? extends Speculator> speculatorClass;
|
Class<? extends Speculator> speculatorClass;
|
||||||
|
|
||||||
|
@ -710,6 +712,22 @@ public class MRAppMaster extends CompositeService {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private final class StagingDirCleaningService extends AbstractService {
|
||||||
|
StagingDirCleaningService() {
|
||||||
|
super(StagingDirCleaningService.class.getName());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public synchronized void stop() {
|
||||||
|
try {
|
||||||
|
cleanupStagingDir();
|
||||||
|
} catch (IOException io) {
|
||||||
|
LOG.error("Failed to cleanup staging dir: ", io);
|
||||||
|
}
|
||||||
|
super.stop();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private class RunningAppContext implements AppContext {
|
private class RunningAppContext implements AppContext {
|
||||||
|
|
||||||
private final Map<JobId, Job> jobs = new ConcurrentHashMap<JobId, Job>();
|
private final Map<JobId, Job> jobs = new ConcurrentHashMap<JobId, Job>();
|
||||||
|
|
|
@ -428,8 +428,12 @@ public class MRApp extends MRAppMaster {
|
||||||
@Override
|
@Override
|
||||||
protected ContainerAllocator createContainerAllocator(
|
protected ContainerAllocator createContainerAllocator(
|
||||||
ClientService clientService, final AppContext context) {
|
ClientService clientService, final AppContext context) {
|
||||||
return new ContainerAllocator(){
|
return new MRAppContainerAllocator();
|
||||||
|
}
|
||||||
|
|
||||||
|
protected class MRAppContainerAllocator implements ContainerAllocator {
|
||||||
private int containerCount;
|
private int containerCount;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void handle(ContainerAllocatorEvent event) {
|
public void handle(ContainerAllocatorEvent event) {
|
||||||
ContainerId cId = recordFactory.newRecordInstance(ContainerId.class);
|
ContainerId cId = recordFactory.newRecordInstance(ContainerId.class);
|
||||||
|
@ -452,7 +456,6 @@ public class MRApp extends MRAppMaster {
|
||||||
new TaskAttemptContainerAssignedEvent(event.getAttemptID(),
|
new TaskAttemptContainerAssignedEvent(event.getAttemptID(),
|
||||||
container, null));
|
container, null));
|
||||||
}
|
}
|
||||||
};
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -18,11 +18,10 @@
|
||||||
|
|
||||||
package org.apache.hadoop.mapreduce.v2.app;
|
package org.apache.hadoop.mapreduce.v2.app;
|
||||||
|
|
||||||
|
import static org.mockito.Mockito.spy;
|
||||||
import static org.mockito.Mockito.times;
|
import static org.mockito.Mockito.times;
|
||||||
import static org.mockito.Mockito.verify;
|
import static org.mockito.Mockito.verify;
|
||||||
import static org.mockito.Mockito.spy;
|
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
|
|
||||||
import junit.framework.Assert;
|
import junit.framework.Assert;
|
||||||
|
@ -36,14 +35,11 @@ import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
|
||||||
import org.apache.hadoop.mapreduce.v2.app.job.Job;
|
import org.apache.hadoop.mapreduce.v2.app.job.Job;
|
||||||
import org.apache.hadoop.mapreduce.v2.app.job.Task;
|
import org.apache.hadoop.mapreduce.v2.app.job.Task;
|
||||||
import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
|
import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
|
||||||
import org.apache.hadoop.mapreduce.v2.app.job.event.JobFinishEvent;
|
|
||||||
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
|
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
|
||||||
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
|
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
|
||||||
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent;
|
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent;
|
||||||
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
|
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
|
||||||
import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl;
|
import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
|
||||||
import org.apache.hadoop.yarn.YarnException;
|
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -237,71 +233,6 @@ public class TestMRApp {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private final class MRAppTestCleanup extends MRApp {
|
|
||||||
boolean hasStopped;
|
|
||||||
boolean cleanedBeforeStopped;
|
|
||||||
|
|
||||||
public MRAppTestCleanup(int maps, int reduces, boolean autoComplete,
|
|
||||||
String testName, boolean cleanOnStart) {
|
|
||||||
super(maps, reduces, autoComplete, testName, cleanOnStart);
|
|
||||||
hasStopped = false;
|
|
||||||
cleanedBeforeStopped = false;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected Job createJob(Configuration conf) {
|
|
||||||
UserGroupInformation currentUser = null;
|
|
||||||
try {
|
|
||||||
currentUser = UserGroupInformation.getCurrentUser();
|
|
||||||
} catch (IOException e) {
|
|
||||||
throw new YarnException(e);
|
|
||||||
}
|
|
||||||
Job newJob = new TestJob(getJobId(), getAttemptID(), conf,
|
|
||||||
getDispatcher().getEventHandler(),
|
|
||||||
getTaskAttemptListener(), getContext().getClock(),
|
|
||||||
getCommitter(), isNewApiCommitter(),
|
|
||||||
currentUser.getUserName(), getContext());
|
|
||||||
((AppContext) getContext()).getAllJobs().put(newJob.getID(), newJob);
|
|
||||||
|
|
||||||
getDispatcher().register(JobFinishEvent.Type.class,
|
|
||||||
createJobFinishEventHandler());
|
|
||||||
|
|
||||||
return newJob;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void cleanupStagingDir() throws IOException {
|
|
||||||
cleanedBeforeStopped = !hasStopped;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public synchronized void stop() {
|
|
||||||
hasStopped = true;
|
|
||||||
super.stop();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected void sysexit() {
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testStagingCleanupOrder() throws Exception {
|
|
||||||
MRAppTestCleanup app = new MRAppTestCleanup(1, 1, true,
|
|
||||||
this.getClass().getName(), true);
|
|
||||||
JobImpl job = (JobImpl)app.submit(new Configuration());
|
|
||||||
app.waitForState(job, JobState.SUCCEEDED);
|
|
||||||
app.verifyCompleted();
|
|
||||||
|
|
||||||
int waitTime = 20 * 1000;
|
|
||||||
while (waitTime > 0 && !app.cleanedBeforeStopped) {
|
|
||||||
Thread.sleep(100);
|
|
||||||
waitTime -= 100;
|
|
||||||
}
|
|
||||||
Assert.assertTrue("Staging directory not cleaned before notifying RM",
|
|
||||||
app.cleanedBeforeStopped);
|
|
||||||
}
|
|
||||||
|
|
||||||
public static void main(String[] args) throws Exception {
|
public static void main(String[] args) throws Exception {
|
||||||
TestMRApp t = new TestMRApp();
|
TestMRApp t = new TestMRApp();
|
||||||
t.testMapReduce();
|
t.testMapReduce();
|
||||||
|
@ -310,6 +241,5 @@ public class TestMRApp {
|
||||||
t.testCompletedMapsForReduceSlowstart();
|
t.testCompletedMapsForReduceSlowstart();
|
||||||
t.testJobError();
|
t.testJobError();
|
||||||
t.testCountersOnJobFinish();
|
t.testCountersOnJobFinish();
|
||||||
t.testStagingCleanupOrder();
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -26,6 +26,7 @@ import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
|
||||||
|
import junit.framework.Assert;
|
||||||
import junit.framework.TestCase;
|
import junit.framework.TestCase;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
|
@ -35,12 +36,21 @@ import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.mapreduce.MRJobConfig;
|
import org.apache.hadoop.mapreduce.MRJobConfig;
|
||||||
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
|
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
|
||||||
|
import org.apache.hadoop.mapreduce.v2.api.records.JobState;
|
||||||
|
import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
|
||||||
|
import org.apache.hadoop.mapreduce.v2.app.job.Job;
|
||||||
import org.apache.hadoop.mapreduce.v2.app.job.event.JobFinishEvent;
|
import org.apache.hadoop.mapreduce.v2.app.job.event.JobFinishEvent;
|
||||||
|
import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl;
|
||||||
|
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator;
|
||||||
|
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocatorEvent;
|
||||||
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
|
import org.apache.hadoop.yarn.YarnException;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
import org.apache.hadoop.yarn.event.EventHandler;
|
import org.apache.hadoop.yarn.event.EventHandler;
|
||||||
import org.apache.hadoop.yarn.factories.RecordFactory;
|
import org.apache.hadoop.yarn.factories.RecordFactory;
|
||||||
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
||||||
|
import org.apache.hadoop.yarn.service.AbstractService;
|
||||||
import org.apache.hadoop.yarn.util.BuilderUtils;
|
import org.apache.hadoop.yarn.util.BuilderUtils;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
|
@ -103,4 +113,89 @@ import org.junit.Test;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private final class MRAppTestCleanup extends MRApp {
|
||||||
|
boolean stoppedContainerAllocator;
|
||||||
|
boolean cleanedBeforeContainerAllocatorStopped;
|
||||||
|
|
||||||
|
public MRAppTestCleanup(int maps, int reduces, boolean autoComplete,
|
||||||
|
String testName, boolean cleanOnStart) {
|
||||||
|
super(maps, reduces, autoComplete, testName, cleanOnStart);
|
||||||
|
stoppedContainerAllocator = false;
|
||||||
|
cleanedBeforeContainerAllocatorStopped = false;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected Job createJob(Configuration conf) {
|
||||||
|
UserGroupInformation currentUser = null;
|
||||||
|
try {
|
||||||
|
currentUser = UserGroupInformation.getCurrentUser();
|
||||||
|
} catch (IOException e) {
|
||||||
|
throw new YarnException(e);
|
||||||
|
}
|
||||||
|
Job newJob = new TestJob(getJobId(), getAttemptID(), conf,
|
||||||
|
getDispatcher().getEventHandler(),
|
||||||
|
getTaskAttemptListener(), getContext().getClock(),
|
||||||
|
getCommitter(), isNewApiCommitter(),
|
||||||
|
currentUser.getUserName(), getContext());
|
||||||
|
((AppContext) getContext()).getAllJobs().put(newJob.getID(), newJob);
|
||||||
|
|
||||||
|
getDispatcher().register(JobFinishEvent.Type.class,
|
||||||
|
createJobFinishEventHandler());
|
||||||
|
|
||||||
|
return newJob;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected ContainerAllocator createContainerAllocator(
|
||||||
|
ClientService clientService, AppContext context) {
|
||||||
|
return new TestCleanupContainerAllocator();
|
||||||
|
}
|
||||||
|
|
||||||
|
private class TestCleanupContainerAllocator extends AbstractService
|
||||||
|
implements ContainerAllocator {
|
||||||
|
private MRAppContainerAllocator allocator;
|
||||||
|
|
||||||
|
TestCleanupContainerAllocator() {
|
||||||
|
super(TestCleanupContainerAllocator.class.getName());
|
||||||
|
allocator = new MRAppContainerAllocator();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void handle(ContainerAllocatorEvent event) {
|
||||||
|
allocator.handle(event);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public synchronized void stop() {
|
||||||
|
stoppedContainerAllocator = true;
|
||||||
|
super.stop();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void cleanupStagingDir() throws IOException {
|
||||||
|
cleanedBeforeContainerAllocatorStopped = !stoppedContainerAllocator;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void sysexit() {
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testStagingCleanupOrder() throws Exception {
|
||||||
|
MRAppTestCleanup app = new MRAppTestCleanup(1, 1, true,
|
||||||
|
this.getClass().getName(), true);
|
||||||
|
JobImpl job = (JobImpl)app.submit(new Configuration());
|
||||||
|
app.waitForState(job, JobState.SUCCEEDED);
|
||||||
|
app.verifyCompleted();
|
||||||
|
|
||||||
|
int waitTime = 20 * 1000;
|
||||||
|
while (waitTime > 0 && !app.cleanedBeforeContainerAllocatorStopped) {
|
||||||
|
Thread.sleep(100);
|
||||||
|
waitTime -= 100;
|
||||||
|
}
|
||||||
|
Assert.assertTrue("Staging directory not cleaned before notifying RM",
|
||||||
|
app.cleanedBeforeContainerAllocatorStopped);
|
||||||
|
}
|
||||||
}
|
}
|
Loading…
Reference in New Issue