MAPREDUCE-3528. Fixed TaskHeartBeatHandler to use a new configuration for the thread loop interval separate from task-timeout configuration property. Contributed by Siddharth Seth.
svn merge --ignore-ancestry -c 1229403 ../../trunk/ git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1229405 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
16288c5a7c
commit
70fb248629
|
@ -90,10 +90,15 @@ Release 0.23.1 - Unreleased
|
||||||
MAPREDUCE-3547. Added a bunch of unit tests for the the RM/NM webservices.
|
MAPREDUCE-3547. Added a bunch of unit tests for the the RM/NM webservices.
|
||||||
(Thomas Graves via acmurthy)
|
(Thomas Graves via acmurthy)
|
||||||
|
|
||||||
MAPREDUCE-3610. Remove use of the 'dfs.block.size' config for default block size fetching. Use FS#getDefaultBlocksize instead. (Sho Shimauchi via harsh)
|
MAPREDUCE-3610. Remove use of the 'dfs.block.size' config for default block
|
||||||
|
size fetching. Use FS#getDefaultBlocksize instead. (Sho Shimauchi via harsh)
|
||||||
|
|
||||||
MAPREDUCE-3478. Cannot build against ZooKeeper 3.4.0. (Tom White via mahadev)
|
MAPREDUCE-3478. Cannot build against ZooKeeper 3.4.0. (Tom White via mahadev)
|
||||||
|
|
||||||
|
MAPREDUCE-3528. Fixed TaskHeartBeatHandler to use a new configuration
|
||||||
|
for the thread loop interval separate from task-timeout configuration
|
||||||
|
property. (Siddharth Seth via vinodkv)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
MAPREDUCE-3567. Extraneous JobConf objects in AM heap. (Vinod Kumar
|
MAPREDUCE-3567. Extraneous JobConf objects in AM heap. (Vinod Kumar
|
||||||
|
|
|
@ -25,6 +25,7 @@ import java.util.Map;
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.mapreduce.MRJobConfig;
|
||||||
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
|
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
|
||||||
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptDiagnosticsUpdateEvent;
|
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptDiagnosticsUpdateEvent;
|
||||||
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
|
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
|
||||||
|
@ -40,6 +41,7 @@ import org.apache.hadoop.yarn.service.AbstractService;
|
||||||
* not hear from it for a long time.
|
* not hear from it for a long time.
|
||||||
*
|
*
|
||||||
*/
|
*/
|
||||||
|
@SuppressWarnings({"unchecked", "rawtypes"})
|
||||||
public class TaskHeartbeatHandler extends AbstractService {
|
public class TaskHeartbeatHandler extends AbstractService {
|
||||||
|
|
||||||
private static final Log LOG = LogFactory.getLog(TaskHeartbeatHandler.class);
|
private static final Log LOG = LogFactory.getLog(TaskHeartbeatHandler.class);
|
||||||
|
@ -49,6 +51,7 @@ public class TaskHeartbeatHandler extends AbstractService {
|
||||||
private Thread lostTaskCheckerThread;
|
private Thread lostTaskCheckerThread;
|
||||||
private volatile boolean stopped;
|
private volatile boolean stopped;
|
||||||
private int taskTimeOut = 5 * 60 * 1000;// 5 mins
|
private int taskTimeOut = 5 * 60 * 1000;// 5 mins
|
||||||
|
private int taskTimeOutCheckInterval = 30 * 1000; // 30 seconds.
|
||||||
|
|
||||||
private final EventHandler eventHandler;
|
private final EventHandler eventHandler;
|
||||||
private final Clock clock;
|
private final Clock clock;
|
||||||
|
@ -65,7 +68,9 @@ public class TaskHeartbeatHandler extends AbstractService {
|
||||||
@Override
|
@Override
|
||||||
public void init(Configuration conf) {
|
public void init(Configuration conf) {
|
||||||
super.init(conf);
|
super.init(conf);
|
||||||
taskTimeOut = conf.getInt("mapreduce.task.timeout", 5*60*1000);
|
taskTimeOut = conf.getInt(MRJobConfig.TASK_TIMEOUT, 5 * 60 * 1000);
|
||||||
|
taskTimeOutCheckInterval =
|
||||||
|
conf.getInt(MRJobConfig.TASK_TIMEOUT_CHECK_INTERVAL_MS, 30 * 1000);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -125,7 +130,7 @@ public class TaskHeartbeatHandler extends AbstractService {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
try {
|
try {
|
||||||
Thread.sleep(taskTimeOut);
|
Thread.sleep(taskTimeOutCheckInterval);
|
||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException e) {
|
||||||
LOG.info("TaskHeartbeatHandler thread interrupted");
|
LOG.info("TaskHeartbeatHandler thread interrupted");
|
||||||
break;
|
break;
|
||||||
|
|
|
@ -156,6 +156,8 @@ public interface MRJobConfig {
|
||||||
|
|
||||||
public static final String TASK_TIMEOUT = "mapreduce.task.timeout";
|
public static final String TASK_TIMEOUT = "mapreduce.task.timeout";
|
||||||
|
|
||||||
|
public static final String TASK_TIMEOUT_CHECK_INTERVAL_MS = "mapreduce.task.timeout.check-interval-ms";
|
||||||
|
|
||||||
public static final String TASK_ID = "mapreduce.task.id";
|
public static final String TASK_ID = "mapreduce.task.id";
|
||||||
|
|
||||||
public static final String TASK_OUTPUT_DIR = "mapreduce.task.output.dir";
|
public static final String TASK_OUTPUT_DIR = "mapreduce.task.output.dir";
|
||||||
|
|
Loading…
Reference in New Issue