svn merge -c 1426536 FIXES: MAPREDUCE-4813. AM timing out during job commit (jlowe via bobby)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1426539 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Joseph Evans 2012-12-28 15:06:15 +00:00
parent 4b5a93bc1d
commit 52742c9a65
36 changed files with 1465 additions and 514 deletions

View File

@ -494,6 +494,8 @@ Release 0.23.6 - UNRELEASED
MAPREDUCE-4902. Fix typo "receievd" should be "received" in log output
(Albert Chu via jlowe)
MAPREDUCE-4813. AM timing out during job commit (jlowe via bobby)
Release 0.23.5 - UNRELEASED
INCOMPATIBLE CHANGES

View File

@ -20,7 +20,6 @@ package org.apache.hadoop.mapred;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
@ -42,12 +41,12 @@ public class MapTaskAttemptImpl extends TaskAttemptImpl {
EventHandler eventHandler, Path jobFile,
int partition, TaskSplitMetaInfo splitInfo, JobConf conf,
TaskAttemptListener taskAttemptListener,
OutputCommitter committer, Token<JobTokenIdentifier> jobToken,
Token<JobTokenIdentifier> jobToken,
Credentials credentials, Clock clock,
AppContext appContext) {
super(taskId, attempt, eventHandler,
taskAttemptListener, jobFile, partition, conf, splitInfo.getLocations(),
committer, jobToken, credentials, clock, appContext);
jobToken, credentials, clock, appContext);
this.splitInfo = splitInfo;
}

View File

@ -20,7 +20,6 @@ package org.apache.hadoop.mapred;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
@ -40,12 +39,12 @@ public class ReduceTaskAttemptImpl extends TaskAttemptImpl {
public ReduceTaskAttemptImpl(TaskId id, int attempt,
EventHandler eventHandler, Path jobFile, int partition,
int numMapTasks, JobConf conf,
TaskAttemptListener taskAttemptListener, OutputCommitter committer,
TaskAttemptListener taskAttemptListener,
Token<JobTokenIdentifier> jobToken,
Credentials credentials, Clock clock,
AppContext appContext) {
super(id, attempt, eventHandler, taskAttemptListener, jobFile, partition,
conf, new String[] {}, committer, jobToken, credentials, clock,
conf, new String[] {}, jobToken, credentials, clock,
appContext);
this.numMapTasks = numMapTasks;
}

View File

@ -62,6 +62,9 @@ import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
import org.apache.hadoop.mapreduce.v2.app.client.MRClientService;
import org.apache.hadoop.mapreduce.v2.app.commit.CommitterEvent;
import org.apache.hadoop.mapreduce.v2.app.commit.CommitterEventHandler;
import org.apache.hadoop.mapreduce.v2.app.commit.CommitterEventType;
import org.apache.hadoop.mapreduce.v2.app.job.Job;
import org.apache.hadoop.mapreduce.v2.app.job.Task;
import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
@ -87,8 +90,6 @@ import org.apache.hadoop.mapreduce.v2.app.rm.RMContainerAllocator;
import org.apache.hadoop.mapreduce.v2.app.speculate.DefaultSpeculator;
import org.apache.hadoop.mapreduce.v2.app.speculate.Speculator;
import org.apache.hadoop.mapreduce.v2.app.speculate.SpeculatorEvent;
import org.apache.hadoop.mapreduce.v2.app.taskclean.TaskCleaner;
import org.apache.hadoop.mapreduce.v2.app.taskclean.TaskCleanerImpl;
import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.security.Credentials;
@ -162,7 +163,7 @@ public class MRAppMaster extends CompositeService {
private Recovery recoveryServ;
private ContainerAllocator containerAllocator;
private ContainerLauncher containerLauncher;
private TaskCleaner taskCleaner;
private EventHandler<CommitterEvent> committerEventHandler;
private Speculator speculator;
private TaskAttemptListener taskAttemptListener;
private JobTokenSecretManager jobTokenSecretManager =
@ -268,8 +269,8 @@ public class MRAppMaster extends CompositeService {
addIfService(taskAttemptListener);
//service to do the task cleanup
taskCleaner = createTaskCleaner(context);
addIfService(taskCleaner);
committerEventHandler = createCommitterEventHandler(context, committer);
addIfService(committerEventHandler);
//service to handle requests from JobClient
clientService = createClientService(context);
@ -288,7 +289,7 @@ public class MRAppMaster extends CompositeService {
dispatcher.register(TaskEventType.class, new TaskEventDispatcher());
dispatcher.register(TaskAttemptEventType.class,
new TaskAttemptEventDispatcher());
dispatcher.register(TaskCleaner.EventType.class, taskCleaner);
dispatcher.register(CommitterEventType.class, committerEventHandler);
if (conf.getBoolean(MRJobConfig.MAP_SPECULATIVE, false)
|| conf.getBoolean(MRJobConfig.REDUCE_SPECULATIVE, false)) {
@ -493,7 +494,7 @@ public class MRAppMaster extends CompositeService {
Job newJob =
new JobImpl(jobId, appAttemptID, conf, dispatcher.getEventHandler(),
taskAttemptListener, jobTokenSecretManager, fsTokens, clock,
completedTasksFromPreviousRun, metrics, committer, newApiCommitter,
completedTasksFromPreviousRun, metrics, newApiCommitter,
currentUser.getUserName(), appSubmitTime, amInfos, context);
((RunningAppContext) context).jobs.put(newJob.getID(), newJob);
@ -585,8 +586,9 @@ public class MRAppMaster extends CompositeService {
return lis;
}
protected TaskCleaner createTaskCleaner(AppContext context) {
return new TaskCleanerImpl(context);
protected EventHandler<CommitterEvent> createCommitterEventHandler(
AppContext context, OutputCommitter committer) {
return new CommitterEventHandler(context, committer);
}
protected ContainerAllocator createContainerAllocator(

View File

@ -0,0 +1,28 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce.v2.app.commit;
import org.apache.hadoop.yarn.event.AbstractEvent;
public class CommitterEvent extends AbstractEvent<CommitterEventType> {
public CommitterEvent(CommitterEventType type) {
super(type);
}
}

View File

@ -0,0 +1,252 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce.v2.app.commit;
import java.io.IOException;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.v2.app.AppContext;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobAbortCompletedEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobCommitCompletedEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobCommitFailedEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobSetupCompletedEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobSetupFailedEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.service.AbstractService;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
public class CommitterEventHandler extends AbstractService
implements EventHandler<CommitterEvent> {
private static final Log LOG =
LogFactory.getLog(CommitterEventHandler.class);
private final AppContext context;
private final OutputCommitter committer;
private ThreadPoolExecutor launcherPool;
private Thread eventHandlingThread;
private BlockingQueue<CommitterEvent> eventQueue =
new LinkedBlockingQueue<CommitterEvent>();
private final AtomicBoolean stopped;
private Thread jobCommitThread = null;
private int commitThreadCancelTimeoutMs;
public CommitterEventHandler(AppContext context, OutputCommitter committer) {
super("CommitterEventHandler");
this.context = context;
this.committer = committer;
this.stopped = new AtomicBoolean(false);
}
@Override
public void init(Configuration conf) {
super.init(conf);
commitThreadCancelTimeoutMs = conf.getInt(
MRJobConfig.MR_AM_COMMITTER_CANCEL_TIMEOUT_MS,
MRJobConfig.DEFAULT_MR_AM_COMMITTER_CANCEL_TIMEOUT_MS);
}
@Override
public void start() {
ThreadFactory tf = new ThreadFactoryBuilder()
.setNameFormat("CommitterEvent Processor #%d")
.build();
launcherPool = new ThreadPoolExecutor(5, 5, 1,
TimeUnit.HOURS, new LinkedBlockingQueue<Runnable>(), tf);
eventHandlingThread = new Thread(new Runnable() {
@Override
public void run() {
CommitterEvent event = null;
while (!stopped.get() && !Thread.currentThread().isInterrupted()) {
try {
event = eventQueue.take();
} catch (InterruptedException e) {
if (!stopped.get()) {
LOG.error("Returning, interrupted : " + e);
}
return;
}
// the events from the queue are handled in parallel
// using a thread pool
launcherPool.execute(new EventProcessor(event)); }
}
});
eventHandlingThread.setName("CommitterEvent Handler");
eventHandlingThread.start();
super.start();
}
@Override
public void handle(CommitterEvent event) {
try {
eventQueue.put(event);
} catch (InterruptedException e) {
throw new YarnException(e);
}
}
@Override
public void stop() {
if (stopped.getAndSet(true)) {
// return if already stopped
return;
}
eventHandlingThread.interrupt();
launcherPool.shutdown();
super.stop();
}
private synchronized void jobCommitStarted() throws IOException {
if (jobCommitThread != null) {
throw new IOException("Commit while another commit thread active: "
+ jobCommitThread.toString());
}
jobCommitThread = Thread.currentThread();
}
private synchronized void jobCommitEnded() {
if (jobCommitThread == Thread.currentThread()) {
jobCommitThread = null;
notifyAll();
}
}
private synchronized void cancelJobCommit() {
Thread threadCommitting = jobCommitThread;
if (threadCommitting != null && threadCommitting.isAlive()) {
LOG.info("Canceling commit");
threadCommitting.interrupt();
// wait up to configured timeout for commit thread to finish
long now = context.getClock().getTime();
long timeoutTimestamp = now + commitThreadCancelTimeoutMs;
try {
while (jobCommitThread == threadCommitting
&& now > timeoutTimestamp) {
wait(now - timeoutTimestamp);
now = context.getClock().getTime();
}
} catch (InterruptedException e) {
}
}
}
private class EventProcessor implements Runnable {
private CommitterEvent event;
EventProcessor(CommitterEvent event) {
this.event = event;
}
@Override
public void run() {
LOG.info("Processing the event " + event.toString());
switch (event.getType()) {
case JOB_SETUP:
handleJobSetup((CommitterJobSetupEvent) event);
break;
case JOB_COMMIT:
handleJobCommit((CommitterJobCommitEvent) event);
break;
case JOB_ABORT:
handleJobAbort((CommitterJobAbortEvent) event);
break;
case TASK_ABORT:
handleTaskAbort((CommitterTaskAbortEvent) event);
break;
default:
throw new YarnException("Unexpected committer event "
+ event.toString());
}
}
@SuppressWarnings("unchecked")
protected void handleJobSetup(CommitterJobSetupEvent event) {
try {
committer.setupJob(event.getJobContext());
context.getEventHandler().handle(
new JobSetupCompletedEvent(event.getJobID()));
} catch (Exception e) {
LOG.warn("Job setup failed", e);
context.getEventHandler().handle(new JobSetupFailedEvent(
event.getJobID(), StringUtils.stringifyException(e)));
}
}
@SuppressWarnings("unchecked")
protected void handleJobCommit(CommitterJobCommitEvent event) {
try {
jobCommitStarted();
committer.commitJob(event.getJobContext());
context.getEventHandler().handle(
new JobCommitCompletedEvent(event.getJobID()));
} catch (Exception e) {
LOG.error("Could not commit job", e);
context.getEventHandler().handle(
new JobCommitFailedEvent(event.getJobID(),
StringUtils.stringifyException(e)));
} finally {
jobCommitEnded();
}
}
@SuppressWarnings("unchecked")
protected void handleJobAbort(CommitterJobAbortEvent event) {
cancelJobCommit();
try {
committer.abortJob(event.getJobContext(), event.getFinalState());
} catch (Exception e) {
LOG.warn("Could not abort job", e);
}
context.getEventHandler().handle(new JobAbortCompletedEvent(
event.getJobID(), event.getFinalState()));
}
@SuppressWarnings("unchecked")
protected void handleTaskAbort(CommitterTaskAbortEvent event) {
try {
committer.abortTask(event.getAttemptContext());
} catch (Exception e) {
LOG.warn("Task cleanup failed for attempt " + event.getAttemptID(), e);
}
context.getEventHandler().handle(
new TaskAttemptEvent(event.getAttemptID(),
TaskAttemptEventType.TA_CLEANUP_DONE));
}
}
}

View File

@ -0,0 +1,26 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce.v2.app.commit;
public enum CommitterEventType {
JOB_SETUP,
JOB_COMMIT,
JOB_ABORT,
TASK_ABORT
}

View File

@ -0,0 +1,50 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce.v2.app.commit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.JobStatus;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
public class CommitterJobAbortEvent extends CommitterEvent {
private JobId jobID;
private JobContext jobContext;
private JobStatus.State finalState;
public CommitterJobAbortEvent(JobId jobID, JobContext jobContext,
JobStatus.State finalState) {
super(CommitterEventType.JOB_ABORT);
this.jobID = jobID;
this.jobContext = jobContext;
this.finalState = finalState;
}
public JobId getJobID() {
return jobID;
}
public JobContext getJobContext() {
return jobContext;
}
public JobStatus.State getFinalState() {
return finalState;
}
}

View File

@ -0,0 +1,42 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce.v2.app.commit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
public class CommitterJobCommitEvent extends CommitterEvent {
private JobId jobID;
private JobContext jobContext;
public CommitterJobCommitEvent(JobId jobID, JobContext jobContext) {
super(CommitterEventType.JOB_COMMIT);
this.jobID = jobID;
this.jobContext = jobContext;
}
public JobId getJobID() {
return jobID;
}
public JobContext getJobContext() {
return jobContext;
}
}

View File

@ -0,0 +1,42 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce.v2.app.commit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
public class CommitterJobSetupEvent extends CommitterEvent {
private JobId jobID;
private JobContext jobContext;
public CommitterJobSetupEvent(JobId jobID, JobContext jobContext) {
super(CommitterEventType.JOB_SETUP);
this.jobID = jobID;
this.jobContext = jobContext;
}
public JobId getJobID() {
return jobID;
}
public JobContext getJobContext() {
return jobContext;
}
}

View File

@ -0,0 +1,43 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce.v2.app.commit;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
public class CommitterTaskAbortEvent extends CommitterEvent {
private final TaskAttemptId attemptID;
private final TaskAttemptContext attemptContext;
public CommitterTaskAbortEvent(TaskAttemptId attemptID,
TaskAttemptContext attemptContext) {
super(CommitterEventType.TASK_ABORT);
this.attemptID = attemptID;
this.attemptContext = attemptContext;
}
public TaskAttemptId getAttemptID() {
return attemptID;
}
public TaskAttemptContext getAttemptContext() {
return attemptContext;
}
}

View File

@ -16,5 +16,5 @@
* limitations under the License.
*/
@InterfaceAudience.Private
package org.apache.hadoop.mapreduce.v2.app.taskclean;
package org.apache.hadoop.mapreduce.v2.app.commit;
import org.apache.hadoop.classification.InterfaceAudience;

View File

@ -20,11 +20,15 @@ package org.apache.hadoop.mapreduce.v2.app.job;
public enum JobStateInternal {
NEW,
SETUP,
INITED,
RUNNING,
COMMITTING,
SUCCEEDED,
FAIL_ABORT,
FAILED,
KILL_WAIT,
KILL_ABORT,
KILLED,
ERROR
}

View File

@ -0,0 +1,36 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce.v2.app.job.event;
import org.apache.hadoop.mapreduce.JobStatus;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
public class JobAbortCompletedEvent extends JobEvent {
private JobStatus.State finalState;
public JobAbortCompletedEvent(JobId jobID, JobStatus.State finalState) {
super(jobID, JobEventType.JOB_ABORT_COMPLETED);
this.finalState = finalState;
}
public JobStatus.State getFinalState() {
return finalState;
}
}

View File

@ -0,0 +1,27 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce.v2.app.job.event;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
public class JobCommitCompletedEvent extends JobEvent {
public JobCommitCompletedEvent(JobId jobID) {
super(jobID, JobEventType.JOB_COMMIT_COMPLETED);
}
}

View File

@ -0,0 +1,34 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce.v2.app.job.event;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
public class JobCommitFailedEvent extends JobEvent {
private String message;
public JobCommitFailedEvent(JobId jobID, String message) {
super(jobID, JobEventType.JOB_COMMIT_FAILED);
this.message = message;
}
public String getMessage() {
return this.message;
}
}

View File

@ -35,6 +35,13 @@ public enum JobEventType {
JOB_MAP_TASK_RESCHEDULED,
JOB_TASK_ATTEMPT_COMPLETED,
//Producer:CommitterEventHandler
JOB_SETUP_COMPLETED,
JOB_SETUP_FAILED,
JOB_COMMIT_COMPLETED,
JOB_COMMIT_FAILED,
JOB_ABORT_COMPLETED,
//Producer:Job
JOB_COMPLETED,

View File

@ -0,0 +1,28 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce.v2.app.job.event;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
public class JobSetupCompletedEvent extends JobEvent {
public JobSetupCompletedEvent(JobId jobID) {
super(jobID, JobEventType.JOB_SETUP_COMPLETED);
}
}

View File

@ -0,0 +1,35 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce.v2.app.job.event;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
public class JobSetupFailedEvent extends JobEvent {
private String message;
public JobSetupFailedEvent(JobId jobID, String message) {
super(jobID, JobEventType.JOB_SETUP_FAILED);
this.message = message;
}
public String getMessage() {
return message;
}
}

View File

@ -47,7 +47,6 @@ import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.JobACL;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.jobhistory.JobFinishedEvent;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
@ -77,14 +76,20 @@ import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
import org.apache.hadoop.mapreduce.v2.app.AppContext;
import org.apache.hadoop.mapreduce.v2.app.TaskAttemptListener;
import org.apache.hadoop.mapreduce.v2.app.commit.CommitterJobAbortEvent;
import org.apache.hadoop.mapreduce.v2.app.commit.CommitterJobCommitEvent;
import org.apache.hadoop.mapreduce.v2.app.commit.CommitterJobSetupEvent;
import org.apache.hadoop.mapreduce.v2.app.job.JobStateInternal;
import org.apache.hadoop.mapreduce.v2.app.job.Task;
import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobAbortCompletedEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobCommitFailedEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobCounterUpdateEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobDiagnosticsUpdateEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobFinishEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobSetupFailedEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobTaskAttemptCompletedEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobTaskAttemptFetchFailureEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobTaskEvent;
@ -138,7 +143,6 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
private final Clock clock;
private final JobACLsManager aclsManager;
private final String username;
private final OutputCommitter committer;
private final Map<JobACL, AccessControlList> jobACLs;
private float setupWeight = 0.05f;
private float cleanupWeight = 0.05f;
@ -176,6 +180,7 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
private Counters fullCounters = null;
private Counters finalMapCounters = null;
private Counters finalReduceCounters = null;
// FIXME:
//
// Can then replace task-level uber counters (MR-2424) with job-level ones
@ -245,7 +250,7 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
DIAGNOSTIC_UPDATE_TRANSITION)
.addTransition(JobStateInternal.INITED, JobStateInternal.INITED,
JobEventType.JOB_COUNTER_UPDATE, COUNTER_UPDATE_TRANSITION)
.addTransition(JobStateInternal.INITED, JobStateInternal.RUNNING,
.addTransition(JobStateInternal.INITED, JobStateInternal.SETUP,
JobEventType.JOB_START,
new StartTransition())
.addTransition(JobStateInternal.INITED, JobStateInternal.KILLED,
@ -257,19 +262,43 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
// Ignore-able events
.addTransition(JobStateInternal.INITED, JobStateInternal.INITED,
JobEventType.JOB_UPDATED_NODES)
// Transitions from SETUP state
.addTransition(JobStateInternal.SETUP, JobStateInternal.SETUP,
JobEventType.JOB_DIAGNOSTIC_UPDATE,
DIAGNOSTIC_UPDATE_TRANSITION)
.addTransition(JobStateInternal.SETUP, JobStateInternal.SETUP,
JobEventType.JOB_COUNTER_UPDATE, COUNTER_UPDATE_TRANSITION)
.addTransition(JobStateInternal.SETUP, JobStateInternal.RUNNING,
JobEventType.JOB_SETUP_COMPLETED,
new SetupCompletedTransition())
.addTransition(JobStateInternal.SETUP, JobStateInternal.FAIL_ABORT,
JobEventType.JOB_SETUP_FAILED,
new SetupFailedTransition())
.addTransition(JobStateInternal.SETUP, JobStateInternal.KILL_ABORT,
JobEventType.JOB_KILL,
new KilledDuringSetupTransition())
.addTransition(JobStateInternal.SETUP, JobStateInternal.ERROR,
JobEventType.INTERNAL_ERROR,
INTERNAL_ERROR_TRANSITION)
// Ignore-able events
.addTransition(JobStateInternal.SETUP, JobStateInternal.SETUP,
JobEventType.JOB_UPDATED_NODES)
// Transitions from RUNNING state
.addTransition(JobStateInternal.RUNNING, JobStateInternal.RUNNING,
JobEventType.JOB_TASK_ATTEMPT_COMPLETED,
TASK_ATTEMPT_COMPLETED_EVENT_TRANSITION)
.addTransition
(JobStateInternal.RUNNING,
EnumSet.of(JobStateInternal.RUNNING, JobStateInternal.SUCCEEDED, JobStateInternal.FAILED),
EnumSet.of(JobStateInternal.RUNNING,
JobStateInternal.COMMITTING, JobStateInternal.FAIL_ABORT),
JobEventType.JOB_TASK_COMPLETED,
new TaskCompletedTransition())
.addTransition
(JobStateInternal.RUNNING,
EnumSet.of(JobStateInternal.RUNNING, JobStateInternal.SUCCEEDED, JobStateInternal.FAILED),
EnumSet.of(JobStateInternal.RUNNING,
JobStateInternal.COMMITTING),
JobEventType.JOB_COMPLETED,
new JobNoTasksCompletedTransition())
.addTransition(JobStateInternal.RUNNING, JobStateInternal.KILL_WAIT,
@ -296,7 +325,8 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
// Transitions from KILL_WAIT state.
.addTransition
(JobStateInternal.KILL_WAIT,
EnumSet.of(JobStateInternal.KILL_WAIT, JobStateInternal.KILLED),
EnumSet.of(JobStateInternal.KILL_WAIT,
JobStateInternal.KILL_ABORT),
JobEventType.JOB_TASK_COMPLETED,
new KillWaitTaskCompletedTransition())
.addTransition(JobStateInternal.KILL_WAIT, JobStateInternal.KILL_WAIT,
@ -318,6 +348,35 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
JobEventType.JOB_MAP_TASK_RESCHEDULED,
JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE))
// Transitions from COMMITTING state
.addTransition(JobStateInternal.COMMITTING,
JobStateInternal.SUCCEEDED,
JobEventType.JOB_COMMIT_COMPLETED,
new CommitSucceededTransition())
.addTransition(JobStateInternal.COMMITTING,
JobStateInternal.FAIL_ABORT,
JobEventType.JOB_COMMIT_FAILED,
new CommitFailedTransition())
.addTransition(JobStateInternal.COMMITTING,
JobStateInternal.KILL_ABORT,
JobEventType.JOB_KILL,
new KilledDuringCommitTransition())
.addTransition(JobStateInternal.COMMITTING,
JobStateInternal.COMMITTING,
JobEventType.JOB_DIAGNOSTIC_UPDATE,
DIAGNOSTIC_UPDATE_TRANSITION)
.addTransition(JobStateInternal.COMMITTING,
JobStateInternal.COMMITTING,
JobEventType.JOB_COUNTER_UPDATE, COUNTER_UPDATE_TRANSITION)
.addTransition(JobStateInternal.COMMITTING,
JobStateInternal.ERROR, JobEventType.INTERNAL_ERROR,
INTERNAL_ERROR_TRANSITION)
// Ignore-able events
.addTransition(JobStateInternal.COMMITTING,
JobStateInternal.COMMITTING,
EnumSet.of(JobEventType.JOB_UPDATED_NODES,
JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE))
// Transitions from SUCCEEDED state
.addTransition(JobStateInternal.SUCCEEDED, JobStateInternal.SUCCEEDED,
JobEventType.JOB_DIAGNOSTIC_UPDATE,
@ -334,6 +393,61 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
JobEventType.JOB_UPDATED_NODES,
JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE))
// Transitions from FAIL_ABORT state
.addTransition(JobStateInternal.FAIL_ABORT,
JobStateInternal.FAIL_ABORT,
JobEventType.JOB_DIAGNOSTIC_UPDATE,
DIAGNOSTIC_UPDATE_TRANSITION)
.addTransition(JobStateInternal.FAIL_ABORT,
JobStateInternal.FAIL_ABORT,
JobEventType.JOB_COUNTER_UPDATE, COUNTER_UPDATE_TRANSITION)
.addTransition(JobStateInternal.FAIL_ABORT, JobStateInternal.FAILED,
JobEventType.JOB_ABORT_COMPLETED,
new JobAbortCompletedTransition())
.addTransition(JobStateInternal.FAIL_ABORT, JobStateInternal.KILLED,
JobEventType.JOB_KILL,
new KilledDuringAbortTransition())
.addTransition(JobStateInternal.FAIL_ABORT,
JobStateInternal.ERROR, JobEventType.INTERNAL_ERROR,
INTERNAL_ERROR_TRANSITION)
// Ignore-able events
.addTransition(JobStateInternal.FAIL_ABORT,
JobStateInternal.FAIL_ABORT,
EnumSet.of(JobEventType.JOB_UPDATED_NODES,
JobEventType.JOB_TASK_COMPLETED,
JobEventType.JOB_TASK_ATTEMPT_COMPLETED,
JobEventType.JOB_MAP_TASK_RESCHEDULED,
JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE,
JobEventType.JOB_COMMIT_COMPLETED,
JobEventType.JOB_COMMIT_FAILED))
// Transitions from KILL_ABORT state
.addTransition(JobStateInternal.KILL_ABORT,
JobStateInternal.KILL_ABORT,
JobEventType.JOB_DIAGNOSTIC_UPDATE,
DIAGNOSTIC_UPDATE_TRANSITION)
.addTransition(JobStateInternal.KILL_ABORT,
JobStateInternal.KILL_ABORT,
JobEventType.JOB_COUNTER_UPDATE, COUNTER_UPDATE_TRANSITION)
.addTransition(JobStateInternal.KILL_ABORT, JobStateInternal.KILLED,
JobEventType.JOB_ABORT_COMPLETED,
new JobAbortCompletedTransition())
.addTransition(JobStateInternal.KILL_ABORT, JobStateInternal.KILLED,
JobEventType.JOB_KILL,
new KilledDuringAbortTransition())
.addTransition(JobStateInternal.KILL_ABORT,
JobStateInternal.ERROR, JobEventType.INTERNAL_ERROR,
INTERNAL_ERROR_TRANSITION)
// Ignore-able events
.addTransition(JobStateInternal.KILL_ABORT,
JobStateInternal.KILL_ABORT,
EnumSet.of(JobEventType.JOB_UPDATED_NODES,
JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE,
JobEventType.JOB_SETUP_COMPLETED,
JobEventType.JOB_SETUP_FAILED,
JobEventType.JOB_COMMIT_COMPLETED,
JobEventType.JOB_COMMIT_FAILED))
// Transitions from FAILED state
.addTransition(JobStateInternal.FAILED, JobStateInternal.FAILED,
JobEventType.JOB_DIAGNOSTIC_UPDATE,
@ -351,7 +465,12 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
JobEventType.JOB_TASK_COMPLETED,
JobEventType.JOB_TASK_ATTEMPT_COMPLETED,
JobEventType.JOB_MAP_TASK_RESCHEDULED,
JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE))
JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE,
JobEventType.JOB_SETUP_COMPLETED,
JobEventType.JOB_SETUP_FAILED,
JobEventType.JOB_COMMIT_COMPLETED,
JobEventType.JOB_COMMIT_FAILED,
JobEventType.JOB_ABORT_COMPLETED))
// Transitions from KILLED state
.addTransition(JobStateInternal.KILLED, JobStateInternal.KILLED,
@ -366,8 +485,14 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
// Ignore-able events
.addTransition(JobStateInternal.KILLED, JobStateInternal.KILLED,
EnumSet.of(JobEventType.JOB_KILL,
JobEventType.JOB_START,
JobEventType.JOB_UPDATED_NODES,
JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE))
JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE,
JobEventType.JOB_SETUP_COMPLETED,
JobEventType.JOB_SETUP_FAILED,
JobEventType.JOB_COMMIT_COMPLETED,
JobEventType.JOB_COMMIT_FAILED,
JobEventType.JOB_ABORT_COMPLETED))
// No transitions from INTERNAL_ERROR state. Ignore all.
.addTransition(
@ -381,6 +506,11 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
JobEventType.JOB_DIAGNOSTIC_UPDATE,
JobEventType.JOB_UPDATED_NODES,
JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE,
JobEventType.JOB_SETUP_COMPLETED,
JobEventType.JOB_SETUP_FAILED,
JobEventType.JOB_COMMIT_COMPLETED,
JobEventType.JOB_COMMIT_FAILED,
JobEventType.JOB_ABORT_COMPLETED,
JobEventType.INTERNAL_ERROR))
.addTransition(JobStateInternal.ERROR, JobStateInternal.ERROR,
JobEventType.JOB_COUNTER_UPDATE, COUNTER_UPDATE_TRANSITION)
@ -417,7 +547,7 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
JobTokenSecretManager jobTokenSecretManager,
Credentials fsTokenCredentials, Clock clock,
Map<TaskId, TaskInfo> completedTasksFromPreviousRun, MRAppMetrics metrics,
OutputCommitter committer, boolean newApiCommitter, String userName,
boolean newApiCommitter, String userName,
long appSubmitTime, List<AMInfo> amInfos, AppContext appContext) {
this.applicationAttemptId = applicationAttemptId;
this.jobId = jobId;
@ -442,7 +572,6 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
this.fsTokens = fsTokenCredentials;
this.jobTokenSecretManager = jobTokenSecretManager;
this.committer = committer;
this.aclsManager = new JobACLsManager(conf);
this.username = System.getProperty("user.name");
@ -461,11 +590,6 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
return jobId;
}
// Getter methods that make unit testing easier (package-scoped)
OutputCommitter getCommitter() {
return this.committer;
}
EventHandler getEventHandler() {
return this.eventHandler;
}
@ -751,9 +875,16 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
}
private static JobState getExternalState(JobStateInternal smState) {
if (smState == JobStateInternal.KILL_WAIT) {
switch (smState) {
case KILL_WAIT:
case KILL_ABORT:
return JobState.KILLED;
} else {
case SETUP:
case COMMITTING:
return JobState.RUNNING;
case FAIL_ABORT:
return JobState.FAILED;
default:
return JobState.valueOf(smState.name());
}
}
@ -799,22 +930,15 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
return FileSystem.get(conf);
}
static JobStateInternal checkJobCompleteSuccess(JobImpl job) {
// check for Job success
if (job.completedTaskCount == job.tasks.size()) {
try {
// Commit job & do cleanup
job.getCommitter().commitJob(job.getJobContext());
} catch (IOException e) {
LOG.error("Could not do commit for Job", e);
job.addDiagnostic("Job commit failed: " + e.getMessage());
job.abortJob(org.apache.hadoop.mapreduce.JobStatus.State.FAILED);
return job.finished(JobStateInternal.FAILED);
}
job.logJobHistoryFinishedEvent();
return job.finished(JobStateInternal.SUCCEEDED);
protected JobStateInternal checkReadyForCommit() {
JobStateInternal currentState = getInternalState();
if (completedTaskCount == tasks.size()
&& currentState == JobStateInternal.RUNNING) {
eventHandler.handle(new CommitterJobCommitEvent(jobId, getJobContext()));
return JobStateInternal.COMMITTING;
}
return null;
// return the current state as job not ready to commit yet
return getInternalState();
}
JobStateInternal finished(JobStateInternal finalState) {
@ -1104,25 +1228,21 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
job.allowedReduceFailuresPercent =
job.conf.getInt(MRJobConfig.REDUCE_FAILURES_MAXPERCENT, 0);
// do the setup
job.committer.setupJob(job.jobContext);
job.setupProgress = 1.0f;
// create the Tasks but don't start them yet
createMapTasks(job, inputLength, taskSplitMetaInfo);
createReduceTasks(job);
job.metrics.endPreparingJob(job);
return JobStateInternal.INITED;
//TODO XXX Should JobInitedEvent be generated here (instead of in StartTransition)
} catch (IOException e) {
LOG.warn("Job init failed", e);
job.metrics.endPreparingJob(job);
job.addDiagnostic("Job init failed : "
+ StringUtils.stringifyException(e));
job.abortJob(org.apache.hadoop.mapreduce.JobStatus.State.FAILED);
job.metrics.endPreparingJob(job);
return job.finished(JobStateInternal.FAILED);
job.eventHandler.handle(new CommitterJobAbortEvent(job.jobId,
job.jobContext,
org.apache.hadoop.mapreduce.JobStatus.State.FAILED));
return JobStateInternal.FAILED;
}
}
@ -1174,7 +1294,7 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
job.remoteJobConfFile,
job.conf, splits[i],
job.taskAttemptListener,
job.committer, job.jobToken, job.fsTokens,
job.jobToken, job.fsTokens,
job.clock, job.completedTasksFromPreviousRun,
job.applicationAttemptId.getAttemptId(),
job.metrics, job.appContext);
@ -1191,7 +1311,7 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
job.eventHandler,
job.remoteJobConfFile,
job.conf, job.numMapTasks,
job.taskAttemptListener, job.committer, job.jobToken,
job.taskAttemptListener, job.jobToken,
job.fsTokens, job.clock,
job.completedTasksFromPreviousRun,
job.applicationAttemptId.getAttemptId(),
@ -1224,6 +1344,35 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
}
} // end of InitTransition
private static class SetupCompletedTransition
implements SingleArcTransition<JobImpl, JobEvent> {
@Override
public void transition(JobImpl job, JobEvent event) {
job.setupProgress = 1.0f;
job.scheduleTasks(job.mapTasks); // schedule (i.e., start) the maps
job.scheduleTasks(job.reduceTasks);
// If we have no tasks, just transition to job completed
if (job.numReduceTasks == 0 && job.numMapTasks == 0) {
job.eventHandler.handle(new JobEvent(job.jobId,
JobEventType.JOB_COMPLETED));
}
}
}
private static class SetupFailedTransition
implements SingleArcTransition<JobImpl, JobEvent> {
@Override
public void transition(JobImpl job, JobEvent event) {
job.metrics.endRunningJob(job);
job.addDiagnostic("Job setup failed : "
+ ((JobSetupFailedEvent) event).getMessage());
job.eventHandler.handle(new CommitterJobAbortEvent(job.jobId,
job.jobContext,
org.apache.hadoop.mapreduce.JobStatus.State.FAILED));
}
}
public static class StartTransition
implements SingleArcTransition<JobImpl, JobEvent> {
/**
@ -1233,43 +1382,45 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
@Override
public void transition(JobImpl job, JobEvent event) {
job.startTime = job.clock.getTime();
job.scheduleTasks(job.mapTasks); // schedule (i.e., start) the maps
job.scheduleTasks(job.reduceTasks);
JobInitedEvent jie =
new JobInitedEvent(job.oldJobId,
job.startTime,
job.numMapTasks, job.numReduceTasks,
job.getState().toString(),
job.isUber()); //Will transition to state running. Currently in INITED
job.isUber());
job.eventHandler.handle(new JobHistoryEvent(job.jobId, jie));
JobInfoChangeEvent jice = new JobInfoChangeEvent(job.oldJobId,
job.appSubmitTime, job.startTime);
job.eventHandler.handle(new JobHistoryEvent(job.jobId, jice));
job.metrics.runningJob(job);
// If we have no tasks, just transition to job completed
if (job.numReduceTasks == 0 && job.numMapTasks == 0) {
job.eventHandler.handle(new JobEvent(job.jobId, JobEventType.JOB_COMPLETED));
}
job.eventHandler.handle(new CommitterJobSetupEvent(
job.jobId, job.jobContext));
}
}
protected void abortJob(
org.apache.hadoop.mapreduce.JobStatus.State finalState) {
try {
committer.abortJob(jobContext, finalState);
} catch (IOException e) {
LOG.warn("Could not abortJob", e);
private void unsuccessfulFinish(JobStateInternal finalState) {
if (finishTime == 0) setFinishTime();
cleanupProgress = 1.0f;
JobUnsuccessfulCompletionEvent unsuccessfulJobEvent =
new JobUnsuccessfulCompletionEvent(oldJobId,
finishTime,
succeededMapTaskCount,
succeededReduceTaskCount,
finalState.toString());
eventHandler.handle(new JobHistoryEvent(jobId,
unsuccessfulJobEvent));
finished(finalState);
}
private static class JobAbortCompletedTransition
implements SingleArcTransition<JobImpl, JobEvent> {
@Override
public void transition(JobImpl job, JobEvent event) {
JobStateInternal finalState = JobStateInternal.valueOf(
((JobAbortCompletedEvent) event).getFinalState().name());
job.unsuccessfulFinish(finalState);
}
if (finishTime == 0) setFinishTime();
cleanupProgress = 1.0f;
JobUnsuccessfulCompletionEvent unsuccessfulJobEvent =
new JobUnsuccessfulCompletionEvent(oldJobId,
finishTime,
succeededMapTaskCount,
succeededReduceTaskCount,
finalState.toString());
eventHandler.handle(new JobHistoryEvent(jobId, unsuccessfulJobEvent));
}
// JobFinishedEvent triggers the move of the history file out of the staging
@ -1343,9 +1494,22 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
implements SingleArcTransition<JobImpl, JobEvent> {
@Override
public void transition(JobImpl job, JobEvent event) {
job.abortJob(org.apache.hadoop.mapreduce.JobStatus.State.KILLED);
job.addDiagnostic("Job received Kill in INITED state.");
job.finished(JobStateInternal.KILLED);
job.eventHandler.handle(new CommitterJobAbortEvent(job.jobId,
job.jobContext,
org.apache.hadoop.mapreduce.JobStatus.State.KILLED));
}
}
private static class KilledDuringSetupTransition
implements SingleArcTransition<JobImpl, JobEvent> {
@Override
public void transition(JobImpl job, JobEvent event) {
job.metrics.endRunningJob(job);
job.addDiagnostic("Job received kill in SETUP state.");
job.eventHandler.handle(new CommitterJobAbortEvent(job.jobId,
job.jobContext,
org.apache.hadoop.mapreduce.JobStatus.State.KILLED));
}
}
@ -1470,10 +1634,10 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
taskKilled(job, task);
}
return checkJobForCompletion(job);
return checkJobAfterTaskCompletion(job);
}
protected JobStateInternal checkJobForCompletion(JobImpl job) {
protected JobStateInternal checkJobAfterTaskCompletion(JobImpl job) {
//check for Job failure
if (job.failedMapTaskCount*100 >
job.allowedMapFailuresPercent*job.numMapTasks ||
@ -1486,17 +1650,13 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
" failedReduces:" + job.failedReduceTaskCount;
LOG.info(diagnosticMsg);
job.addDiagnostic(diagnosticMsg);
job.abortJob(org.apache.hadoop.mapreduce.JobStatus.State.FAILED);
return job.finished(JobStateInternal.FAILED);
job.eventHandler.handle(new CommitterJobAbortEvent(job.jobId,
job.jobContext,
org.apache.hadoop.mapreduce.JobStatus.State.FAILED));
return JobStateInternal.FAIL_ABORT;
}
JobStateInternal jobCompleteSuccess = JobImpl.checkJobCompleteSuccess(job);
if (jobCompleteSuccess != null) {
return jobCompleteSuccess;
}
//return the current state, Job not finished yet
return job.getInternalState();
return job.checkReadyForCommit();
}
private void taskSucceeded(JobImpl job, Task task) {
@ -1529,18 +1689,52 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
}
// Transition class for handling jobs with no tasks
static class JobNoTasksCompletedTransition implements
private static class JobNoTasksCompletedTransition implements
MultipleArcTransition<JobImpl, JobEvent, JobStateInternal> {
@Override
public JobStateInternal transition(JobImpl job, JobEvent event) {
JobStateInternal jobCompleteSuccess = JobImpl.checkJobCompleteSuccess(job);
if (jobCompleteSuccess != null) {
return jobCompleteSuccess;
}
// Return the current state, Job not finished yet
return job.getInternalState();
return job.checkReadyForCommit();
}
}
private static class CommitSucceededTransition implements
SingleArcTransition<JobImpl, JobEvent> {
@Override
public void transition(JobImpl job, JobEvent event) {
job.logJobHistoryFinishedEvent();
job.finished(JobStateInternal.SUCCEEDED);
}
}
private static class CommitFailedTransition implements
SingleArcTransition<JobImpl, JobEvent> {
@Override
public void transition(JobImpl job, JobEvent event) {
JobCommitFailedEvent jcfe = (JobCommitFailedEvent)event;
job.addDiagnostic("Job commit failed: " + jcfe.getMessage());
job.eventHandler.handle(new CommitterJobAbortEvent(job.jobId,
job.jobContext,
org.apache.hadoop.mapreduce.JobStatus.State.FAILED));
}
}
private static class KilledDuringCommitTransition implements
SingleArcTransition<JobImpl, JobEvent> {
@Override
public void transition(JobImpl job, JobEvent event) {
job.setFinishTime();
job.eventHandler.handle(new CommitterJobAbortEvent(job.jobId,
job.jobContext,
org.apache.hadoop.mapreduce.JobStatus.State.KILLED));
}
}
private static class KilledDuringAbortTransition implements
SingleArcTransition<JobImpl, JobEvent> {
@Override
public void transition(JobImpl job, JobEvent event) {
job.unsuccessfulFinish(JobStateInternal.KILLED);
}
}
@ -1557,11 +1751,13 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
private static class KillWaitTaskCompletedTransition extends
TaskCompletedTransition {
@Override
protected JobStateInternal checkJobForCompletion(JobImpl job) {
protected JobStateInternal checkJobAfterTaskCompletion(JobImpl job) {
if (job.completedTaskCount == job.tasks.size()) {
job.setFinishTime();
job.abortJob(org.apache.hadoop.mapreduce.JobStatus.State.KILLED);
return job.finished(JobStateInternal.KILLED);
job.eventHandler.handle(new CommitterJobAbortEvent(job.jobId,
job.jobContext,
org.apache.hadoop.mapreduce.JobStatus.State.KILLED));
return JobStateInternal.KILL_ABORT;
}
//return the current state, Job not finished yet
return job.getInternalState();

View File

@ -24,7 +24,6 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapTaskAttemptImpl;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
@ -47,13 +46,13 @@ public class MapTaskImpl extends TaskImpl {
public MapTaskImpl(JobId jobId, int partition, EventHandler eventHandler,
Path remoteJobConfFile, JobConf conf,
TaskSplitMetaInfo taskSplitMetaInfo,
TaskAttemptListener taskAttemptListener, OutputCommitter committer,
TaskAttemptListener taskAttemptListener,
Token<JobTokenIdentifier> jobToken,
Credentials credentials, Clock clock,
Map<TaskId, TaskInfo> completedTasksFromPreviousRun, int startCount,
MRAppMetrics metrics, AppContext appContext) {
super(jobId, TaskType.MAP, partition, eventHandler, remoteJobConfFile,
conf, taskAttemptListener, committer, jobToken, credentials, clock,
conf, taskAttemptListener, jobToken, credentials, clock,
completedTasksFromPreviousRun, startCount, metrics, appContext);
this.taskSplitMetaInfo = taskSplitMetaInfo;
}
@ -68,7 +67,7 @@ public class MapTaskImpl extends TaskImpl {
return new MapTaskAttemptImpl(getID(), nextAttemptNumber,
eventHandler, jobFile,
partition, taskSplitMetaInfo, conf, taskAttemptListener,
committer, jobToken, credentials, clock, appContext);
jobToken, credentials, clock, appContext);
}
@Override

View File

@ -24,7 +24,6 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.ReduceTaskAttemptImpl;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
@ -46,12 +45,12 @@ public class ReduceTaskImpl extends TaskImpl {
public ReduceTaskImpl(JobId jobId, int partition,
EventHandler eventHandler, Path jobFile, JobConf conf,
int numMapTasks, TaskAttemptListener taskAttemptListener,
OutputCommitter committer, Token<JobTokenIdentifier> jobToken,
Token<JobTokenIdentifier> jobToken,
Credentials credentials, Clock clock,
Map<TaskId, TaskInfo> completedTasksFromPreviousRun, int startCount,
MRAppMetrics metrics, AppContext appContext) {
super(jobId, TaskType.REDUCE, partition, eventHandler, jobFile, conf,
taskAttemptListener, committer, jobToken, credentials, clock,
taskAttemptListener, jobToken, credentials, clock,
completedTasksFromPreviousRun, startCount, metrics, appContext);
this.numMapTasks = numMapTasks;
}
@ -66,7 +65,7 @@ public class ReduceTaskImpl extends TaskImpl {
return new ReduceTaskAttemptImpl(getID(), nextAttemptNumber,
eventHandler, jobFile,
partition, numMapTasks, conf, taskAttemptListener,
committer, jobToken, credentials, clock, appContext);
jobToken, credentials, clock, appContext);
}
@Override

View File

@ -39,7 +39,6 @@ import java.util.regex.Pattern;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
@ -57,7 +56,6 @@ import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.JobCounter;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskCounter;
import org.apache.hadoop.mapreduce.TypeConverter;
@ -76,6 +74,7 @@ import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
import org.apache.hadoop.mapreduce.v2.app.AppContext;
import org.apache.hadoop.mapreduce.v2.app.TaskAttemptListener;
import org.apache.hadoop.mapreduce.v2.app.commit.CommitterTaskAbortEvent;
import org.apache.hadoop.mapreduce.v2.app.job.TaskAttemptStateInternal;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobCounterUpdateEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobDiagnosticsUpdateEvent;
@ -99,7 +98,6 @@ import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator;
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocatorEvent;
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerRequestEvent;
import org.apache.hadoop.mapreduce.v2.app.speculate.SpeculatorEvent;
import org.apache.hadoop.mapreduce.v2.app.taskclean.TaskCleanupEvent;
import org.apache.hadoop.mapreduce.v2.util.MRApps;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.Credentials;
@ -157,7 +155,6 @@ public abstract class TaskAttemptImpl implements
private final Clock clock;
private final org.apache.hadoop.mapred.JobID oldJobId;
private final TaskAttemptListener taskAttemptListener;
private final OutputCommitter committer;
private final Resource resourceCapability;
private final String[] dataLocalHosts;
private final List<String> diagnostics = new ArrayList<String>();
@ -501,7 +498,7 @@ public abstract class TaskAttemptImpl implements
public TaskAttemptImpl(TaskId taskId, int i,
EventHandler eventHandler,
TaskAttemptListener taskAttemptListener, Path jobFile, int partition,
JobConf conf, String[] dataLocalHosts, OutputCommitter committer,
JobConf conf, String[] dataLocalHosts,
Token<JobTokenIdentifier> jobToken,
Credentials credentials, Clock clock,
AppContext appContext) {
@ -525,7 +522,6 @@ public abstract class TaskAttemptImpl implements
this.credentials = credentials;
this.jobToken = jobToken;
this.eventHandler = eventHandler;
this.committer = committer;
this.jobFile = jobFile;
this.partition = partition;
@ -1436,10 +1432,8 @@ public abstract class TaskAttemptImpl implements
TaskAttemptContext taskContext =
new TaskAttemptContextImpl(taskAttempt.conf,
TypeConverter.fromYarn(taskAttempt.attemptId));
taskAttempt.eventHandler.handle(new TaskCleanupEvent(
taskAttempt.attemptId,
taskAttempt.committer,
taskContext));
taskAttempt.eventHandler.handle(new CommitterTaskAbortEvent(
taskAttempt.attemptId, taskContext));
}
}

View File

@ -37,7 +37,6 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.MRConfig;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
@ -100,7 +99,6 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
protected final JobConf conf;
protected final Path jobFile;
protected final OutputCommitter committer;
protected final int partition;
protected final TaskAttemptListener taskAttemptListener;
protected final EventHandler eventHandler;
@ -278,7 +276,7 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
public TaskImpl(JobId jobId, TaskType taskType, int partition,
EventHandler eventHandler, Path remoteJobConfFile, JobConf conf,
TaskAttemptListener taskAttemptListener, OutputCommitter committer,
TaskAttemptListener taskAttemptListener,
Token<JobTokenIdentifier> jobToken,
Credentials credentials, Clock clock,
Map<TaskId, TaskInfo> completedTasksFromPreviousRun, int startCount,
@ -301,7 +299,6 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
this.partition = partition;
this.taskAttemptListener = taskAttemptListener;
this.eventHandler = eventHandler;
this.committer = committer;
this.credentials = credentials;
this.jobToken = jobToken;
this.metrics = metrics;

View File

@ -48,6 +48,8 @@ import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
import org.apache.hadoop.mapreduce.v2.app.ControlledClock;
import org.apache.hadoop.mapreduce.v2.app.commit.CommitterTaskAbortEvent;
import org.apache.hadoop.mapreduce.v2.app.commit.CommitterEventType;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobDiagnosticsUpdateEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType;
@ -65,8 +67,6 @@ import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherEvent;
import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerRemoteLaunchEvent;
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator;
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocatorEvent;
import org.apache.hadoop.mapreduce.v2.app.taskclean.TaskCleaner;
import org.apache.hadoop.mapreduce.v2.app.taskclean.TaskCleanupEvent;
import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
import org.apache.hadoop.yarn.Clock;
@ -339,8 +339,8 @@ public class RecoveryService extends CompositeService implements Recovery {
return;
}
else if (event.getType() == TaskCleaner.EventType.TASK_CLEAN) {
TaskAttemptId aId = ((TaskCleanupEvent) event).getAttemptID();
else if (event.getType() == CommitterEventType.TASK_ABORT) {
TaskAttemptId aId = ((CommitterTaskAbortEvent) event).getAttemptID();
LOG.debug("TASK_CLEAN");
actualHandler.handle(new TaskAttemptEvent(aId,
TaskAttemptEventType.TA_CLEANUP_DONE));

View File

@ -1,28 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce.v2.app.taskclean;
import org.apache.hadoop.yarn.event.EventHandler;
public interface TaskCleaner extends EventHandler<TaskCleanupEvent> {
enum EventType {
TASK_CLEAN
}
}

View File

@ -1,124 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce.v2.app.taskclean;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.mapreduce.v2.app.AppContext;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.service.AbstractService;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
public class TaskCleanerImpl extends AbstractService implements TaskCleaner {
private static final Log LOG = LogFactory.getLog(TaskCleanerImpl.class);
private final AppContext context;
private ThreadPoolExecutor launcherPool;
private Thread eventHandlingThread;
private BlockingQueue<TaskCleanupEvent> eventQueue =
new LinkedBlockingQueue<TaskCleanupEvent>();
private final AtomicBoolean stopped;
public TaskCleanerImpl(AppContext context) {
super("TaskCleaner");
this.context = context;
this.stopped = new AtomicBoolean(false);
}
public void start() {
ThreadFactory tf = new ThreadFactoryBuilder()
.setNameFormat("TaskCleaner #%d")
.build();
launcherPool = new ThreadPoolExecutor(5, 5, 1,
TimeUnit.HOURS, new LinkedBlockingQueue<Runnable>(), tf);
eventHandlingThread = new Thread(new Runnable() {
@Override
public void run() {
TaskCleanupEvent event = null;
while (!stopped.get() && !Thread.currentThread().isInterrupted()) {
try {
event = eventQueue.take();
} catch (InterruptedException e) {
if (!stopped.get()) {
LOG.error("Returning, interrupted : " + e);
}
return;
}
// the events from the queue are handled in parallel
// using a thread pool
launcherPool.execute(new EventProcessor(event)); }
}
});
eventHandlingThread.setName("TaskCleaner Event Handler");
eventHandlingThread.start();
super.start();
}
public void stop() {
if (stopped.getAndSet(true)) {
// return if already stopped
return;
}
eventHandlingThread.interrupt();
launcherPool.shutdown();
super.stop();
}
private class EventProcessor implements Runnable {
private TaskCleanupEvent event;
EventProcessor(TaskCleanupEvent event) {
this.event = event;
}
@Override
public void run() {
LOG.info("Processing the event " + event.toString());
try {
event.getCommitter().abortTask(event.getAttemptContext());
} catch (Exception e) {
LOG.warn("Task cleanup failed for attempt " + event.getAttemptID(), e);
}
context.getEventHandler().handle(
new TaskAttemptEvent(event.getAttemptID(),
TaskAttemptEventType.TA_CLEANUP_DONE));
}
}
@Override
public void handle(TaskCleanupEvent event) {
try {
eventQueue.put(event);
} catch (InterruptedException e) {
throw new YarnException(e);
}
}
}

View File

@ -1,56 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce.v2.app.taskclean;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
import org.apache.hadoop.yarn.event.AbstractEvent;
/**
* This class encapsulates task cleanup event.
*
*/
public class TaskCleanupEvent extends AbstractEvent<TaskCleaner.EventType> {
private final TaskAttemptId attemptID;
private final OutputCommitter committer;
private final TaskAttemptContext attemptContext;
public TaskCleanupEvent(TaskAttemptId attemptID, OutputCommitter committer,
TaskAttemptContext attemptContext) {
super(TaskCleaner.EventType.TASK_CLEAN);
this.attemptID = attemptID;
this.committer = committer;
this.attemptContext = attemptContext;
}
public TaskAttemptId getAttemptID() {
return attemptID;
}
public OutputCommitter getCommitter() {
return committer;
}
public TaskAttemptContext getAttemptContext() {
return attemptContext;
}
}

View File

@ -32,9 +32,12 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.WrappedJvmID;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.JobStatus.State;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
import org.apache.hadoop.mapreduce.jobhistory.NormalizedResourceEvent;
@ -49,6 +52,8 @@ import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskReport;
import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
import org.apache.hadoop.mapreduce.v2.app.commit.CommitterEvent;
import org.apache.hadoop.mapreduce.v2.app.commit.CommitterEventHandler;
import org.apache.hadoop.mapreduce.v2.app.job.Job;
import org.apache.hadoop.mapreduce.v2.app.job.JobStateInternal;
import org.apache.hadoop.mapreduce.v2.app.job.Task;
@ -69,8 +74,6 @@ import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher;
import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherEvent;
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator;
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocatorEvent;
import org.apache.hadoop.mapreduce.v2.app.taskclean.TaskCleaner;
import org.apache.hadoop.mapreduce.v2.app.taskclean.TaskCleanupEvent;
import org.apache.hadoop.mapreduce.v2.util.MRApps;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.net.NetUtils;
@ -394,8 +397,7 @@ public class MRApp extends MRAppMaster {
Job newJob = new TestJob(getJobId(), getAttemptID(), conf,
getDispatcher().getEventHandler(),
getTaskAttemptListener(), getContext().getClock(),
getCommitter(), isNewApiCommitter(),
currentUser.getUserName(), getContext());
isNewApiCommitter(), currentUser.getUserName(), getContext());
((AppContext) getContext()).getAllJobs().put(newJob.getID(), newJob);
getDispatcher().register(JobFinishEvent.Type.class,
@ -515,16 +517,56 @@ public class MRApp extends MRAppMaster {
}
@Override
protected TaskCleaner createTaskCleaner(AppContext context) {
return new TaskCleaner() {
protected EventHandler<CommitterEvent> createCommitterEventHandler(
AppContext context, final OutputCommitter committer) {
// create an output committer with the task methods stubbed out
OutputCommitter stubbedCommitter = new OutputCommitter() {
@Override
public void handle(TaskCleanupEvent event) {
//send the cleanup done event
getContext().getEventHandler().handle(
new TaskAttemptEvent(event.getAttemptID(),
TaskAttemptEventType.TA_CLEANUP_DONE));
public void setupJob(JobContext jobContext) throws IOException {
committer.setupJob(jobContext);
}
@SuppressWarnings("deprecation")
@Override
public void cleanupJob(JobContext jobContext) throws IOException {
committer.cleanupJob(jobContext);
}
@Override
public void commitJob(JobContext jobContext) throws IOException {
committer.commitJob(jobContext);
}
@Override
public void abortJob(JobContext jobContext, State state)
throws IOException {
committer.abortJob(jobContext, state);
}
@Override
public boolean isRecoverySupported() {
return committer.isRecoverySupported();
}
@Override
public void setupTask(TaskAttemptContext taskContext)
throws IOException {
}
@Override
public boolean needsTaskCommit(TaskAttemptContext taskContext)
throws IOException {
return false;
}
@Override
public void commitTask(TaskAttemptContext taskContext)
throws IOException {
}
@Override
public void abortTask(TaskAttemptContext taskContext)
throws IOException {
}
@Override
public void recoverTask(TaskAttemptContext taskContext)
throws IOException {
}
};
return new CommitterEventHandler(context, stubbedCommitter);
}
@Override
@ -576,12 +618,11 @@ public class MRApp extends MRAppMaster {
public TestJob(JobId jobId, ApplicationAttemptId applicationAttemptId,
Configuration conf, EventHandler eventHandler,
TaskAttemptListener taskAttemptListener, Clock clock,
OutputCommitter committer, boolean newApiCommitter, String user,
AppContext appContext) {
boolean newApiCommitter, String user, AppContext appContext) {
super(jobId, getApplicationAttemptId(applicationId, getStartCount()),
conf, eventHandler, taskAttemptListener,
new JobTokenSecretManager(), new Credentials(), clock,
getCompletedTaskFromPreviousRun(), metrics, committer,
getCompletedTaskFromPreviousRun(), metrics,
newApiCommitter, user, System.currentTimeMillis(), getAllAMInfos(),
appContext);

View File

@ -201,8 +201,7 @@ import org.junit.Test;
Job newJob = new TestJob(getJobId(), getAttemptID(), conf,
getDispatcher().getEventHandler(),
getTaskAttemptListener(), getContext().getClock(),
getCommitter(), isNewApiCommitter(),
currentUser.getUserName(), getContext());
isNewApiCommitter(), currentUser.getUserName(), getContext());
((AppContext) getContext()).getAllJobs().put(newJob.getID(), newJob);
getDispatcher().register(JobFinishEvent.Type.class,

View File

@ -19,46 +19,51 @@
package org.apache.hadoop.mapreduce.v2.app.job.impl;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.io.IOException;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.JobACL;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.JobStatus.State;
import org.apache.hadoop.mapreduce.MRConfig;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.JobState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
import org.apache.hadoop.mapreduce.v2.app.AppContext;
import org.apache.hadoop.mapreduce.v2.app.commit.CommitterEventHandler;
import org.apache.hadoop.mapreduce.v2.app.commit.CommitterEventType;
import org.apache.hadoop.mapreduce.v2.app.job.JobStateInternal;
import org.apache.hadoop.mapreduce.v2.app.job.Task;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobDiagnosticsUpdateEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobFinishEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobTaskEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl.InitTransition;
import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl.JobNoTasksCompletedTransition;
import org.apache.hadoop.mapreduce.v2.app.metrics.MRAppMetrics;
import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.SystemClock;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.InlineDispatcher;
import org.apache.hadoop.yarn.state.StateMachine;
import org.apache.hadoop.yarn.state.StateMachineFactory;
import org.apache.hadoop.yarn.util.Records;
@ -69,121 +74,223 @@ import org.junit.Test;
/**
* Tests various functions of the JobImpl class
*/
@SuppressWarnings({"unchecked", "rawtypes"})
@SuppressWarnings({"rawtypes"})
public class TestJobImpl {
@Test
public void testJobNoTasksTransition() {
JobNoTasksCompletedTransition trans = new JobNoTasksCompletedTransition();
JobImpl mockJob = mock(JobImpl.class);
public void testJobNoTasks() {
Configuration conf = new Configuration();
conf.setInt(MRJobConfig.NUM_REDUCES, 0);
AsyncDispatcher dispatcher = new AsyncDispatcher();
dispatcher.init(conf);
dispatcher.start();
OutputCommitter committer = mock(OutputCommitter.class);
CommitterEventHandler commitHandler =
createCommitterEventHandler(dispatcher, committer);
commitHandler.init(conf);
commitHandler.start();
// Force checkJobCompleteSuccess to return null
Task mockTask = mock(Task.class);
Map<TaskId, Task> tasks = new HashMap<TaskId, Task>();
tasks.put(mockTask.getID(), mockTask);
mockJob.tasks = tasks;
when(mockJob.getInternalState()).thenReturn(JobStateInternal.ERROR);
JobEvent mockJobEvent = mock(JobEvent.class);
JobStateInternal state = trans.transition(mockJob, mockJobEvent);
Assert.assertEquals("Incorrect state returned from JobNoTasksCompletedTransition",
JobStateInternal.ERROR, state);
JobImpl job = createStubbedJob(conf, dispatcher, 0);
job.handle(new JobEvent(job.getID(), JobEventType.JOB_INIT));
assertJobState(job, JobStateInternal.INITED);
job.handle(new JobEvent(job.getID(), JobEventType.JOB_START));
assertJobState(job, JobStateInternal.SUCCEEDED);
dispatcher.stop();
commitHandler.stop();
}
@Test
public void testCommitJobFailsJob() {
@Test(timeout=20000)
public void testCommitJobFailsJob() throws Exception {
Configuration conf = new Configuration();
AsyncDispatcher dispatcher = new AsyncDispatcher();
dispatcher.init(conf);
dispatcher.start();
CyclicBarrier syncBarrier = new CyclicBarrier(2);
OutputCommitter committer = new TestingOutputCommitter(syncBarrier, false);
CommitterEventHandler commitHandler =
createCommitterEventHandler(dispatcher, committer);
commitHandler.init(conf);
commitHandler.start();
JobImpl mockJob = mock(JobImpl.class);
mockJob.tasks = new HashMap<TaskId, Task>();
OutputCommitter mockCommitter = mock(OutputCommitter.class);
EventHandler mockEventHandler = mock(EventHandler.class);
JobContext mockJobContext = mock(JobContext.class);
JobImpl job = createRunningStubbedJob(conf, dispatcher, 2);
completeJobTasks(job);
assertJobState(job, JobStateInternal.COMMITTING);
when(mockJob.getCommitter()).thenReturn(mockCommitter);
when(mockJob.getEventHandler()).thenReturn(mockEventHandler);
when(mockJob.getJobContext()).thenReturn(mockJobContext);
when(mockJob.finished(JobStateInternal.KILLED)).thenReturn(
JobStateInternal.KILLED);
when(mockJob.finished(JobStateInternal.FAILED)).thenReturn(
JobStateInternal.FAILED);
when(mockJob.finished(JobStateInternal.SUCCEEDED)).thenReturn(
JobStateInternal.SUCCEEDED);
try {
doThrow(new IOException()).when(mockCommitter).commitJob(any(JobContext.class));
} catch (IOException e) {
// commitJob stubbed out, so this can't happen
}
doNothing().when(mockEventHandler).handle(any(JobHistoryEvent.class));
JobStateInternal jobState = JobImpl.checkJobCompleteSuccess(mockJob);
Assert.assertNotNull("checkJobCompleteSuccess incorrectly returns null " +
"for successful job", jobState);
Assert.assertEquals("checkJobCompleteSuccess returns incorrect state",
JobStateInternal.FAILED, jobState);
verify(mockJob).abortJob(
eq(org.apache.hadoop.mapreduce.JobStatus.State.FAILED));
// let the committer fail and verify the job fails
syncBarrier.await();
assertJobState(job, JobStateInternal.FAILED);
dispatcher.stop();
commitHandler.stop();
}
@Test
public void testCheckJobCompleteSuccess() {
JobImpl mockJob = mock(JobImpl.class);
mockJob.tasks = new HashMap<TaskId, Task>();
OutputCommitter mockCommitter = mock(OutputCommitter.class);
EventHandler mockEventHandler = mock(EventHandler.class);
JobContext mockJobContext = mock(JobContext.class);
when(mockJob.getCommitter()).thenReturn(mockCommitter);
when(mockJob.getEventHandler()).thenReturn(mockEventHandler);
when(mockJob.getJobContext()).thenReturn(mockJobContext);
doNothing().when(mockJob).setFinishTime();
doNothing().when(mockJob).logJobHistoryFinishedEvent();
when(mockJob.finished(any(JobStateInternal.class))).thenReturn(
JobStateInternal.SUCCEEDED);
@Test(timeout=20000)
public void testCheckJobCompleteSuccess() throws Exception {
Configuration conf = new Configuration();
AsyncDispatcher dispatcher = new AsyncDispatcher();
dispatcher.init(conf);
dispatcher.start();
CyclicBarrier syncBarrier = new CyclicBarrier(2);
OutputCommitter committer = new TestingOutputCommitter(syncBarrier, true);
CommitterEventHandler commitHandler =
createCommitterEventHandler(dispatcher, committer);
commitHandler.init(conf);
commitHandler.start();
try {
doNothing().when(mockCommitter).commitJob(any(JobContext.class));
} catch (IOException e) {
// commitJob stubbed out, so this can't happen
}
doNothing().when(mockEventHandler).handle(any(JobHistoryEvent.class));
Assert.assertNotNull("checkJobCompleteSuccess incorrectly returns null " +
"for successful job",
JobImpl.checkJobCompleteSuccess(mockJob));
Assert.assertEquals("checkJobCompleteSuccess returns incorrect state",
JobStateInternal.SUCCEEDED, JobImpl.checkJobCompleteSuccess(mockJob));
JobImpl job = createRunningStubbedJob(conf, dispatcher, 2);
completeJobTasks(job);
assertJobState(job, JobStateInternal.COMMITTING);
// let the committer complete and verify the job succeeds
syncBarrier.await();
assertJobState(job, JobStateInternal.SUCCEEDED);
dispatcher.stop();
commitHandler.stop();
}
@Test
public void testCheckJobCompleteSuccessFailed() {
JobImpl mockJob = mock(JobImpl.class);
@Test(timeout=20000)
public void testKilledDuringSetup() throws Exception {
Configuration conf = new Configuration();
AsyncDispatcher dispatcher = new AsyncDispatcher();
dispatcher.init(conf);
dispatcher.start();
OutputCommitter committer = new StubbedOutputCommitter() {
@Override
public synchronized void setupJob(JobContext jobContext)
throws IOException {
while (!Thread.interrupted()) {
try {
wait();
} catch (InterruptedException e) {
}
}
}
};
CommitterEventHandler commitHandler =
createCommitterEventHandler(dispatcher, committer);
commitHandler.init(conf);
commitHandler.start();
// Make the completedTasks not equal the getTasks()
Task mockTask = mock(Task.class);
Map<TaskId, Task> tasks = new HashMap<TaskId, Task>();
tasks.put(mockTask.getID(), mockTask);
mockJob.tasks = tasks;
try {
// Just in case the code breaks and reaches these calls
OutputCommitter mockCommitter = mock(OutputCommitter.class);
EventHandler mockEventHandler = mock(EventHandler.class);
doNothing().when(mockCommitter).commitJob(any(JobContext.class));
doNothing().when(mockEventHandler).handle(any(JobHistoryEvent.class));
} catch (IOException e) {
e.printStackTrace();
}
Assert.assertNull("checkJobCompleteSuccess incorrectly returns not-null " +
"for unsuccessful job",
JobImpl.checkJobCompleteSuccess(mockJob));
JobImpl job = createStubbedJob(conf, dispatcher, 2);
JobId jobId = job.getID();
job.handle(new JobEvent(jobId, JobEventType.JOB_INIT));
assertJobState(job, JobStateInternal.INITED);
job.handle(new JobEvent(jobId, JobEventType.JOB_START));
assertJobState(job, JobStateInternal.SETUP);
job.handle(new JobEvent(job.getID(), JobEventType.JOB_KILL));
assertJobState(job, JobStateInternal.KILLED);
dispatcher.stop();
commitHandler.stop();
}
@Test(timeout=20000)
public void testKilledDuringCommit() throws Exception {
Configuration conf = new Configuration();
AsyncDispatcher dispatcher = new AsyncDispatcher();
dispatcher.init(conf);
dispatcher.start();
CyclicBarrier syncBarrier = new CyclicBarrier(2);
OutputCommitter committer = new WaitingOutputCommitter(syncBarrier, true);
CommitterEventHandler commitHandler =
createCommitterEventHandler(dispatcher, committer);
commitHandler.init(conf);
commitHandler.start();
JobImpl job = createRunningStubbedJob(conf, dispatcher, 2);
completeJobTasks(job);
assertJobState(job, JobStateInternal.COMMITTING);
syncBarrier.await();
job.handle(new JobEvent(job.getID(), JobEventType.JOB_KILL));
assertJobState(job, JobStateInternal.KILLED);
dispatcher.stop();
commitHandler.stop();
}
@Test(timeout=20000)
public void testKilledDuringFailAbort() throws Exception {
Configuration conf = new Configuration();
AsyncDispatcher dispatcher = new AsyncDispatcher();
dispatcher.init(conf);
dispatcher.start();
OutputCommitter committer = new StubbedOutputCommitter() {
@Override
public void setupJob(JobContext jobContext) throws IOException {
throw new IOException("forced failure");
}
@Override
public synchronized void abortJob(JobContext jobContext, State state)
throws IOException {
while (!Thread.interrupted()) {
try {
wait();
} catch (InterruptedException e) {
}
}
}
};
CommitterEventHandler commitHandler =
createCommitterEventHandler(dispatcher, committer);
commitHandler.init(conf);
commitHandler.start();
JobImpl job = createStubbedJob(conf, dispatcher, 2);
JobId jobId = job.getID();
job.handle(new JobEvent(jobId, JobEventType.JOB_INIT));
assertJobState(job, JobStateInternal.INITED);
job.handle(new JobEvent(jobId, JobEventType.JOB_START));
assertJobState(job, JobStateInternal.FAIL_ABORT);
job.handle(new JobEvent(jobId, JobEventType.JOB_KILL));
assertJobState(job, JobStateInternal.KILLED);
dispatcher.stop();
commitHandler.stop();
}
@Test(timeout=20000)
public void testKilledDuringKillAbort() throws Exception {
Configuration conf = new Configuration();
AsyncDispatcher dispatcher = new AsyncDispatcher();
dispatcher.init(conf);
dispatcher.start();
OutputCommitter committer = new StubbedOutputCommitter() {
@Override
public synchronized void abortJob(JobContext jobContext, State state)
throws IOException {
while (!Thread.interrupted()) {
try {
wait();
} catch (InterruptedException e) {
}
}
}
};
CommitterEventHandler commitHandler =
createCommitterEventHandler(dispatcher, committer);
commitHandler.init(conf);
commitHandler.start();
JobImpl job = createStubbedJob(conf, dispatcher, 2);
JobId jobId = job.getID();
job.handle(new JobEvent(jobId, JobEventType.JOB_INIT));
assertJobState(job, JobStateInternal.INITED);
job.handle(new JobEvent(jobId, JobEventType.JOB_START));
assertJobState(job, JobStateInternal.SETUP);
job.handle(new JobEvent(jobId, JobEventType.JOB_KILL));
assertJobState(job, JobStateInternal.KILL_ABORT);
job.handle(new JobEvent(jobId, JobEventType.JOB_KILL));
assertJobState(job, JobStateInternal.KILLED);
dispatcher.stop();
commitHandler.stop();
}
public static void main(String[] args) throws Exception {
TestJobImpl t = new TestJobImpl();
t.testJobNoTasksTransition();
t.testJobNoTasks();
t.testCheckJobCompleteSuccess();
t.testCheckJobCompleteSuccessFailed();
t.testCheckAccess();
t.testReportDiagnostics();
t.testUberDecision();
@ -208,7 +315,7 @@ public class TestJobImpl {
// Verify access
JobImpl job1 = new JobImpl(jobId, null, conf1, null, null, null, null, null,
null, null, null, true, null, 0, null, null);
null, null, true, null, 0, null, null);
Assert.assertTrue(job1.checkAccess(ugi1, JobACL.VIEW_JOB));
Assert.assertFalse(job1.checkAccess(ugi2, JobACL.VIEW_JOB));
@ -219,7 +326,7 @@ public class TestJobImpl {
// Verify access
JobImpl job2 = new JobImpl(jobId, null, conf2, null, null, null, null, null,
null, null, null, true, null, 0, null, null);
null, null, true, null, 0, null, null);
Assert.assertTrue(job2.checkAccess(ugi1, JobACL.VIEW_JOB));
Assert.assertTrue(job2.checkAccess(ugi2, JobACL.VIEW_JOB));
@ -230,7 +337,7 @@ public class TestJobImpl {
// Verify access
JobImpl job3 = new JobImpl(jobId, null, conf3, null, null, null, null, null,
null, null, null, true, null, 0, null, null);
null, null, true, null, 0, null, null);
Assert.assertTrue(job3.checkAccess(ugi1, JobACL.VIEW_JOB));
Assert.assertTrue(job3.checkAccess(ugi2, JobACL.VIEW_JOB));
@ -241,7 +348,7 @@ public class TestJobImpl {
// Verify access
JobImpl job4 = new JobImpl(jobId, null, conf4, null, null, null, null, null,
null, null, null, true, null, 0, null, null);
null, null, true, null, 0, null, null);
Assert.assertTrue(job4.checkAccess(ugi1, JobACL.VIEW_JOB));
Assert.assertTrue(job4.checkAccess(ugi2, JobACL.VIEW_JOB));
@ -252,7 +359,7 @@ public class TestJobImpl {
// Verify access
JobImpl job5 = new JobImpl(jobId, null, conf5, null, null, null, null, null,
null, null, null, true, null, 0, null, null);
null, null, true, null, 0, null, null);
Assert.assertTrue(job5.checkAccess(ugi1, null));
Assert.assertTrue(job5.checkAccess(ugi2, null));
}
@ -270,8 +377,7 @@ public class TestJobImpl {
mock(EventHandler.class),
null, mock(JobTokenSecretManager.class), null,
new SystemClock(), null,
mrAppMetrics, mock(OutputCommitter.class),
true, null, 0, null, null);
mrAppMetrics, true, null, 0, null, null);
job.handle(diagUpdateEvent);
String diagnostics = job.getReport().getDiagnostics();
Assert.assertNotNull(diagnostics);
@ -282,8 +388,7 @@ public class TestJobImpl {
mock(EventHandler.class),
null, mock(JobTokenSecretManager.class), null,
new SystemClock(), null,
mrAppMetrics, mock(OutputCommitter.class),
true, null, 0, null, null);
mrAppMetrics, true, null, 0, null, null);
job.handle(new JobEvent(jobId, JobEventType.JOB_KILL));
job.handle(diagUpdateEvent);
diagnostics = job.getReport().getDiagnostics();
@ -338,20 +443,23 @@ public class TestJobImpl {
JobImpl job = new JobImpl(jobId, Records
.newRecord(ApplicationAttemptId.class), conf, mock(EventHandler.class),
null, mock(JobTokenSecretManager.class), null, null, null,
mrAppMetrics, mock(OutputCommitter.class), true, null, 0, null, null);
InitTransition initTransition = getInitTransition();
mrAppMetrics, true, null, 0, null, null);
InitTransition initTransition = getInitTransition(2);
JobEvent mockJobEvent = mock(JobEvent.class);
initTransition.transition(job, mockJobEvent);
boolean isUber = job.isUber();
return isUber;
}
private static InitTransition getInitTransition() {
private static InitTransition getInitTransition(final int numSplits) {
InitTransition initTransition = new InitTransition() {
@Override
protected TaskSplitMetaInfo[] createSplits(JobImpl job, JobId jobId) {
return new TaskSplitMetaInfo[] { new TaskSplitMetaInfo(),
new TaskSplitMetaInfo() };
TaskSplitMetaInfo[] splits = new TaskSplitMetaInfo[numSplits];
for (int i = 0; i < numSplits; ++i) {
splits[i] = new TaskSplitMetaInfo();
}
return splits;
}
};
return initTransition;
@ -360,19 +468,24 @@ public class TestJobImpl {
@Test
public void testTransitionsAtFailed() throws IOException {
Configuration conf = new Configuration();
JobID jobID = JobID.forName("job_1234567890000_0001");
JobId jobId = TypeConverter.toYarn(jobID);
AsyncDispatcher dispatcher = new AsyncDispatcher();
dispatcher.init(conf);
dispatcher.start();
OutputCommitter committer = mock(OutputCommitter.class);
doThrow(new IOException("forcefail"))
.when(committer).setupJob(any(JobContext.class));
InlineDispatcher dispatcher = new InlineDispatcher();
JobImpl job = new StubbedJob(jobId, Records
.newRecord(ApplicationAttemptId.class), conf,
dispatcher.getEventHandler(), committer, true, null);
CommitterEventHandler commitHandler =
createCommitterEventHandler(dispatcher, committer);
commitHandler.init(conf);
commitHandler.start();
dispatcher.register(JobEventType.class, job);
JobImpl job = createStubbedJob(conf, dispatcher, 2);
JobId jobId = job.getID();
job.handle(new JobEvent(jobId, JobEventType.JOB_INIT));
Assert.assertEquals(JobState.FAILED, job.getState());
assertJobState(job, JobStateInternal.INITED);
job.handle(new JobEvent(jobId, JobEventType.JOB_START));
assertJobState(job, JobStateInternal.FAILED);
job.handle(new JobEvent(jobId, JobEventType.JOB_TASK_COMPLETED));
Assert.assertEquals(JobState.FAILED, job.getState());
@ -382,17 +495,86 @@ public class TestJobImpl {
Assert.assertEquals(JobState.FAILED, job.getState());
job.handle(new JobEvent(jobId, JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE));
Assert.assertEquals(JobState.FAILED, job.getState());
dispatcher.stop();
commitHandler.stop();
}
private static CommitterEventHandler createCommitterEventHandler(
Dispatcher dispatcher, OutputCommitter committer) {
SystemClock clock = new SystemClock();
AppContext appContext = mock(AppContext.class);
when(appContext.getEventHandler()).thenReturn(
dispatcher.getEventHandler());
when(appContext.getClock()).thenReturn(clock);
CommitterEventHandler handler =
new CommitterEventHandler(appContext, committer);
dispatcher.register(CommitterEventType.class, handler);
return handler;
}
private static StubbedJob createStubbedJob(Configuration conf,
Dispatcher dispatcher, int numSplits) {
JobID jobID = JobID.forName("job_1234567890000_0001");
JobId jobId = TypeConverter.toYarn(jobID);
StubbedJob job = new StubbedJob(jobId,
Records.newRecord(ApplicationAttemptId.class), conf,
dispatcher.getEventHandler(), true, "somebody", numSplits);
dispatcher.register(JobEventType.class, job);
EventHandler mockHandler = mock(EventHandler.class);
dispatcher.register(TaskEventType.class, mockHandler);
dispatcher.register(org.apache.hadoop.mapreduce.jobhistory.EventType.class,
mockHandler);
dispatcher.register(JobFinishEvent.Type.class, mockHandler);
return job;
}
private static StubbedJob createRunningStubbedJob(Configuration conf,
Dispatcher dispatcher, int numSplits) {
StubbedJob job = createStubbedJob(conf, dispatcher, numSplits);
job.handle(new JobEvent(job.getID(), JobEventType.JOB_INIT));
assertJobState(job, JobStateInternal.INITED);
job.handle(new JobEvent(job.getID(), JobEventType.JOB_START));
assertJobState(job, JobStateInternal.RUNNING);
return job;
}
private static void completeJobTasks(JobImpl job) {
// complete the map tasks and the reduce tasks so we start committing
int numMaps = job.getTotalMaps();
for (int i = 0; i < numMaps; ++i) {
job.handle(new JobTaskEvent(
MRBuilderUtils.newTaskId(job.getID(), 1, TaskType.MAP),
TaskState.SUCCEEDED));
Assert.assertEquals(JobState.RUNNING, job.getState());
}
int numReduces = job.getTotalReduces();
for (int i = 0; i < numReduces; ++i) {
job.handle(new JobTaskEvent(
MRBuilderUtils.newTaskId(job.getID(), 1, TaskType.MAP),
TaskState.SUCCEEDED));
Assert.assertEquals(JobState.RUNNING, job.getState());
}
}
private static void assertJobState(JobImpl job, JobStateInternal state) {
int timeToWaitMsec = 5 * 1000;
while (timeToWaitMsec > 0 && job.getInternalState() != state) {
try {
Thread.sleep(10);
timeToWaitMsec -= 10;
} catch (InterruptedException e) {
break;
}
}
Assert.assertEquals(state, job.getInternalState());
}
private static class StubbedJob extends JobImpl {
//override the init transition
private final InitTransition initTransition = getInitTransition();
StateMachineFactory<JobImpl, JobStateInternal, JobEventType, JobEvent> localFactory
= stateMachineFactory.addTransition(JobStateInternal.NEW,
EnumSet.of(JobStateInternal.INITED, JobStateInternal.FAILED),
JobEventType.JOB_INIT,
// This is abusive.
initTransition);
private final InitTransition initTransition;
StateMachineFactory<JobImpl, JobStateInternal, JobEventType, JobEvent>
localFactory;
private final StateMachine<JobStateInternal, JobEventType, JobEvent>
localStateMachine;
@ -404,15 +586,102 @@ public class TestJobImpl {
public StubbedJob(JobId jobId, ApplicationAttemptId applicationAttemptId,
Configuration conf, EventHandler eventHandler,
OutputCommitter committer, boolean newApiCommitter, String user) {
boolean newApiCommitter, String user, int numSplits) {
super(jobId, applicationAttemptId, conf, eventHandler,
null, new JobTokenSecretManager(), new Credentials(),
new SystemClock(), null, MRAppMetrics.create(), committer,
new SystemClock(), null, MRAppMetrics.create(),
newApiCommitter, user, System.currentTimeMillis(), null, null);
initTransition = getInitTransition(numSplits);
localFactory = stateMachineFactory.addTransition(JobStateInternal.NEW,
EnumSet.of(JobStateInternal.INITED, JobStateInternal.FAILED),
JobEventType.JOB_INIT,
// This is abusive.
initTransition);
// This "this leak" is okay because the retained pointer is in an
// instance variable.
localStateMachine = localFactory.make(this);
}
}
private static class StubbedOutputCommitter extends OutputCommitter {
public StubbedOutputCommitter() {
super();
}
@Override
public void setupJob(JobContext jobContext) throws IOException {
}
@Override
public void setupTask(TaskAttemptContext taskContext) throws IOException {
}
@Override
public boolean needsTaskCommit(TaskAttemptContext taskContext)
throws IOException {
return false;
}
@Override
public void commitTask(TaskAttemptContext taskContext) throws IOException {
}
@Override
public void abortTask(TaskAttemptContext taskContext) throws IOException {
}
}
private static class TestingOutputCommitter extends StubbedOutputCommitter {
CyclicBarrier syncBarrier;
boolean shouldSucceed;
public TestingOutputCommitter(CyclicBarrier syncBarrier,
boolean shouldSucceed) {
super();
this.syncBarrier = syncBarrier;
this.shouldSucceed = shouldSucceed;
}
@Override
public void commitJob(JobContext jobContext) throws IOException {
try {
syncBarrier.await();
} catch (BrokenBarrierException e) {
} catch (InterruptedException e) {
}
if (!shouldSucceed) {
throw new IOException("forced failure");
}
}
}
private static class WaitingOutputCommitter extends TestingOutputCommitter {
public WaitingOutputCommitter(CyclicBarrier syncBarrier,
boolean shouldSucceed) {
super(syncBarrier, shouldSucceed);
}
@Override
public void commitJob(JobContext jobContext) throws IOException {
try {
syncBarrier.await();
} catch (BrokenBarrierException e) {
} catch (InterruptedException e) {
}
while (!Thread.interrupted()) {
try {
synchronized (this) {
wait();
}
} catch (InterruptedException e) {
break;
}
}
}
}
}

View File

@ -43,7 +43,6 @@ import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapTaskAttemptImpl;
import org.apache.hadoop.mapreduce.JobCounter;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
import org.apache.hadoop.mapreduce.jobhistory.TaskAttemptUnsuccessfulCompletion;
import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
@ -253,10 +252,9 @@ public class TestTaskAttempt{
TaskAttemptListener taListener = mock(TaskAttemptListener.class);
Path jobFile = mock(Path.class);
JobConf jobConf = new JobConf();
OutputCommitter outputCommitter = mock(OutputCommitter.class);
TaskAttemptImpl taImpl =
new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1,
taskSplitMetaInfo, jobConf, taListener, outputCommitter, null,
taskSplitMetaInfo, jobConf, taListener, null,
null, clock, null);
return taImpl;
}
@ -342,7 +340,7 @@ public class TestTaskAttempt{
TaskAttemptImpl taImpl =
new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1,
splits, jobConf, taListener,
mock(OutputCommitter.class), mock(Token.class), new Credentials(),
mock(Token.class), new Credentials(),
new SystemClock(), null);
NodeId nid = BuilderUtils.newNodeId("127.0.0.1", 0);
@ -397,7 +395,7 @@ public class TestTaskAttempt{
TaskAttemptImpl taImpl =
new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1,
splits, jobConf, taListener,
mock(OutputCommitter.class), mock(Token.class), new Credentials(),
mock(Token.class), new Credentials(),
new SystemClock(), appCtx);
NodeId nid = BuilderUtils.newNodeId("127.0.0.1", 0);
@ -453,7 +451,7 @@ public class TestTaskAttempt{
TaskAttemptImpl taImpl =
new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1,
splits, jobConf, taListener,
mock(OutputCommitter.class), mock(Token.class), new Credentials(),
mock(Token.class), new Credentials(),
new SystemClock(), appCtx);
NodeId nid = BuilderUtils.newNodeId("127.0.0.1", 0);
@ -512,7 +510,7 @@ public class TestTaskAttempt{
TaskAttemptImpl taImpl =
new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1,
splits, jobConf, taListener,
mock(OutputCommitter.class), mock(Token.class), new Credentials(),
mock(Token.class), new Credentials(),
new SystemClock(), appCtx);
NodeId nid = BuilderUtils.newNodeId("127.0.0.1", 0);
@ -578,7 +576,7 @@ public class TestTaskAttempt{
when(resource.getMemory()).thenReturn(1024);
TaskAttemptImpl taImpl = new MapTaskAttemptImpl(taskId, 1, eventHandler,
jobFile, 1, splits, jobConf, taListener, mock(OutputCommitter.class),
jobFile, 1, splits, jobConf, taListener,
mock(Token.class), new Credentials(), new SystemClock(), appCtx);
NodeId nid = BuilderUtils.newNodeId("127.0.0.1", 0);
@ -628,7 +626,7 @@ public class TestTaskAttempt{
when(resource.getMemory()).thenReturn(1024);
TaskAttemptImpl taImpl = new MapTaskAttemptImpl(taskId, 1, eventHandler,
jobFile, 1, splits, jobConf, taListener, mock(OutputCommitter.class),
jobFile, 1, splits, jobConf, taListener,
mock(Token.class), new Credentials(), new SystemClock(), appCtx);
NodeId nid = BuilderUtils.newNodeId("127.0.0.1", 0);

View File

@ -40,7 +40,6 @@ import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapTaskAttemptImpl;
import org.apache.hadoop.mapred.WrappedJvmID;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
@ -107,7 +106,7 @@ public class TestTaskAttemptContainerRequest {
TaskAttemptImpl taImpl =
new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1,
mock(TaskSplitMetaInfo.class), jobConf, taListener,
mock(OutputCommitter.class), jobToken, credentials,
jobToken, credentials,
new SystemClock(), null);
jobConf.set(MRJobConfig.APPLICATION_ATTEMPT_ID, taImpl.getID().toString());

View File

@ -35,7 +35,6 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Task;
import org.apache.hadoop.mapred.TaskUmbilicalProtocol;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
@ -71,7 +70,6 @@ public class TestTaskImpl {
private JobConf conf;
private TaskAttemptListener taskAttemptListener;
private OutputCommitter committer;
private Token<JobTokenIdentifier> jobToken;
private JobId jobId;
private Path remoteJobConfFile;
@ -99,13 +97,13 @@ public class TestTaskImpl {
public MockTaskImpl(JobId jobId, int partition,
EventHandler eventHandler, Path remoteJobConfFile, JobConf conf,
TaskAttemptListener taskAttemptListener, OutputCommitter committer,
TaskAttemptListener taskAttemptListener,
Token<JobTokenIdentifier> jobToken,
Credentials credentials, Clock clock,
Map<TaskId, TaskInfo> completedTasksFromPreviousRun, int startCount,
MRAppMetrics metrics, AppContext appContext, TaskType taskType) {
super(jobId, taskType , partition, eventHandler,
remoteJobConfFile, conf, taskAttemptListener, committer,
remoteJobConfFile, conf, taskAttemptListener,
jobToken, credentials, clock,
completedTasksFromPreviousRun, startCount, metrics, appContext);
this.taskType = taskType;
@ -120,7 +118,7 @@ public class TestTaskImpl {
protected TaskAttemptImpl createAttempt() {
MockTaskAttemptImpl attempt = new MockTaskAttemptImpl(getID(), ++taskAttemptCounter,
eventHandler, taskAttemptListener, remoteJobConfFile, partition,
conf, committer, jobToken, credentials, clock, appContext, taskType);
conf, jobToken, credentials, clock, appContext, taskType);
taskAttempts.add(attempt);
return attempt;
}
@ -145,12 +143,11 @@ public class TestTaskImpl {
public MockTaskAttemptImpl(TaskId taskId, int id, EventHandler eventHandler,
TaskAttemptListener taskAttemptListener, Path jobFile, int partition,
JobConf conf, OutputCommitter committer,
Token<JobTokenIdentifier> jobToken,
JobConf conf, Token<JobTokenIdentifier> jobToken,
Credentials credentials, Clock clock,
AppContext appContext, TaskType taskType) {
super(taskId, id, eventHandler, taskAttemptListener, jobFile, partition, conf,
dataLocations, committer, jobToken, credentials, clock, appContext);
dataLocations, jobToken, credentials, clock, appContext);
this.taskType = taskType;
}
@ -210,7 +207,6 @@ public class TestTaskImpl {
conf = new JobConf();
taskAttemptListener = mock(TaskAttemptListener.class);
committer = mock(OutputCommitter.class);
jobToken = (Token<JobTokenIdentifier>) mock(Token.class);
remoteJobConfFile = mock(Path.class);
credentials = null;
@ -235,7 +231,7 @@ public class TestTaskImpl {
private MockTaskImpl createMockTask(TaskType taskType) {
return new MockTaskImpl(jobId, partition, dispatcher.getEventHandler(),
remoteJobConfFile, conf, taskAttemptListener, committer, jobToken,
remoteJobConfFile, conf, taskAttemptListener, jobToken,
credentials, clock,
completedTasksFromPreviousRun, startCount,
metrics, appContext, taskType);
@ -606,7 +602,7 @@ public class TestTaskImpl {
@Test
public void testFailedTransitions() {
mockTask = new MockTaskImpl(jobId, partition, dispatcher.getEventHandler(),
remoteJobConfFile, conf, taskAttemptListener, committer, jobToken,
remoteJobConfFile, conf, taskAttemptListener, jobToken,
credentials, clock,
completedTasksFromPreviousRun, startCount,
metrics, appContext, TaskType.MAP) {

View File

@ -461,6 +461,15 @@ public interface MRJobConfig {
MR_AM_PREFIX + "scheduler.connection.wait.interval-ms";
public static final int DEFAULT_MR_AM_TO_RM_WAIT_INTERVAL_MS = 360000;
/**
* How long to wait in milliseconds for the output committer to cancel
* an operation when the job is being killed
*/
public static final String MR_AM_COMMITTER_CANCEL_TIMEOUT_MS =
MR_AM_PREFIX + "job.committer.cancel-timeout";
public static final int DEFAULT_MR_AM_COMMITTER_CANCEL_TIMEOUT_MS =
60 * 1000;
/**
* Boolean. Create the base dirs in the JobHistoryEventHandler
* Set to false for multi-user clusters. This is an internal config that

View File

@ -1297,6 +1297,13 @@
For example 50000-50050,50100-50200</description>
</property>
<property>
<name>yarn.app.mapreduce.am.job.committer.cancel-timeout</name>
<value>60000</value>
<description>The amount of time in milliseconds to wait for the output
committer to cancel an operation if the job is killed</description>
</property>
<property>
<name>yarn.app.mapreduce.am.scheduler.heartbeat.interval-ms</name>
<value>1000</value>