Merge trunk into HA branch.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-1623@1244645 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
commit
1fb0ab92f8
|
@ -169,6 +169,10 @@ Release 0.23.2 - UNRELEASED
|
||||||
HADOOP-8071. Avoid an extra packet in client code when nagling is
|
HADOOP-8071. Avoid an extra packet in client code when nagling is
|
||||||
disabled. (todd)
|
disabled. (todd)
|
||||||
|
|
||||||
|
HADOOP-6502. Improve the performance of Configuration.getClassByName when
|
||||||
|
the class is not found by caching negative results.
|
||||||
|
(sharad, todd via todd)
|
||||||
|
|
||||||
BUG FIXES
|
BUG FIXES
|
||||||
|
|
||||||
HADOOP-8042 When copying a file out of HDFS, modifying it, and uploading
|
HADOOP-8042 When copying a file out of HDFS, modifying it, and uploading
|
||||||
|
|
|
@ -1146,6 +1146,22 @@ public class Configuration implements Iterable<Map.Entry<String,String>>,
|
||||||
* @throws ClassNotFoundException if the class is not found.
|
* @throws ClassNotFoundException if the class is not found.
|
||||||
*/
|
*/
|
||||||
public Class<?> getClassByName(String name) throws ClassNotFoundException {
|
public Class<?> getClassByName(String name) throws ClassNotFoundException {
|
||||||
|
Class<?> ret = getClassByNameOrNull(name);
|
||||||
|
if (ret == null) {
|
||||||
|
throw new ClassNotFoundException("Class " + name + " not found");
|
||||||
|
}
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Load a class by name, returning null rather than throwing an exception
|
||||||
|
* if it couldn't be loaded. This is to avoid the overhead of creating
|
||||||
|
* an exception.
|
||||||
|
*
|
||||||
|
* @param name the class name
|
||||||
|
* @return the class object, or null if it could not be found.
|
||||||
|
*/
|
||||||
|
public Class<?> getClassByNameOrNull(String name) {
|
||||||
Map<String, Class<?>> map;
|
Map<String, Class<?>> map;
|
||||||
|
|
||||||
synchronized (CACHE_CLASSES) {
|
synchronized (CACHE_CLASSES) {
|
||||||
|
@ -1157,12 +1173,20 @@ public class Configuration implements Iterable<Map.Entry<String,String>>,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
Class<?> clazz = map.get(name);
|
Class<?> clazz = null;
|
||||||
if (clazz == null) {
|
if (!map.containsKey(name)) {
|
||||||
|
try {
|
||||||
clazz = Class.forName(name, true, classLoader);
|
clazz = Class.forName(name, true, classLoader);
|
||||||
if (clazz != null) {
|
} catch (ClassNotFoundException e) {
|
||||||
|
map.put(name, null); //cache negative that class is not found
|
||||||
|
return null;
|
||||||
|
}
|
||||||
// two putters can race here, but they'll put the same class
|
// two putters can race here, but they'll put the same class
|
||||||
map.put(name, clazz);
|
map.put(name, clazz);
|
||||||
|
} else { // check already performed on this class name
|
||||||
|
clazz = map.get(name);
|
||||||
|
if (clazz == null) { // found the negative
|
||||||
|
return null;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -86,17 +86,22 @@ public class ReflectionUtils {
|
||||||
//invoke configure on theObject
|
//invoke configure on theObject
|
||||||
try {
|
try {
|
||||||
Class<?> jobConfClass =
|
Class<?> jobConfClass =
|
||||||
conf.getClassByName("org.apache.hadoop.mapred.JobConf");
|
conf.getClassByNameOrNull("org.apache.hadoop.mapred.JobConf");
|
||||||
|
if (jobConfClass == null) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
Class<?> jobConfigurableClass =
|
Class<?> jobConfigurableClass =
|
||||||
conf.getClassByName("org.apache.hadoop.mapred.JobConfigurable");
|
conf.getClassByNameOrNull("org.apache.hadoop.mapred.JobConfigurable");
|
||||||
|
if (jobConfigurableClass == null) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
if (jobConfClass.isAssignableFrom(conf.getClass()) &&
|
if (jobConfClass.isAssignableFrom(conf.getClass()) &&
|
||||||
jobConfigurableClass.isAssignableFrom(theObject.getClass())) {
|
jobConfigurableClass.isAssignableFrom(theObject.getClass())) {
|
||||||
Method configureMethod =
|
Method configureMethod =
|
||||||
jobConfigurableClass.getMethod("configure", jobConfClass);
|
jobConfigurableClass.getMethod("configure", jobConfClass);
|
||||||
configureMethod.invoke(theObject, conf);
|
configureMethod.invoke(theObject, conf);
|
||||||
}
|
}
|
||||||
} catch (ClassNotFoundException e) {
|
|
||||||
//JobConf/JobConfigurable not in classpath. no need to configure
|
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
throw new RuntimeException("Error in configuring object", e);
|
throw new RuntimeException("Error in configuring object", e);
|
||||||
}
|
}
|
||||||
|
|
|
@ -216,6 +216,13 @@
|
||||||
determine the host, port, etc. for a filesystem.</description>
|
determine the host, port, etc. for a filesystem.</description>
|
||||||
</property>
|
</property>
|
||||||
|
|
||||||
|
<property>
|
||||||
|
<name>fs.default.name</name>
|
||||||
|
<value>file:///</value>
|
||||||
|
<description>Deprecated. Use (fs.defaultFS) property
|
||||||
|
instead</description>
|
||||||
|
</property>
|
||||||
|
|
||||||
<property>
|
<property>
|
||||||
<name>fs.trash.interval</name>
|
<name>fs.trash.interval</name>
|
||||||
<value>0</value>
|
<value>0</value>
|
||||||
|
|
|
@ -210,6 +210,9 @@ Trunk (unreleased changes)
|
||||||
dfs.client.block.write.replace-datanode-on-failure.enable to be mistakenly
|
dfs.client.block.write.replace-datanode-on-failure.enable to be mistakenly
|
||||||
disabled. (atm)
|
disabled. (atm)
|
||||||
|
|
||||||
|
HDFS-2525. Race between BlockPoolSliceScanner and append. (Brandon Li
|
||||||
|
via jitendra)
|
||||||
|
|
||||||
Release 0.23.2 - UNRELEASED
|
Release 0.23.2 - UNRELEASED
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
@ -245,6 +248,9 @@ Release 0.23.2 - UNRELEASED
|
||||||
HDFS-2815. Namenode sometimes oes not come out of safemode during
|
HDFS-2815. Namenode sometimes oes not come out of safemode during
|
||||||
NN crash + restart. (Uma Maheswara Rao via suresh)
|
NN crash + restart. (Uma Maheswara Rao via suresh)
|
||||||
|
|
||||||
|
HDFS-2950. Secondary NN HTTPS address should be listed as a
|
||||||
|
NAMESERVICE_SPECIFIC_KEY. (todd)
|
||||||
|
|
||||||
Release 0.23.1 - 2012-02-08
|
Release 0.23.1 - 2012-02-08
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
|
|
@ -99,6 +99,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
|
||||||
public static final int DFS_NAMENODE_SAFEMODE_MIN_DATANODES_DEFAULT = 0;
|
public static final int DFS_NAMENODE_SAFEMODE_MIN_DATANODES_DEFAULT = 0;
|
||||||
public static final String DFS_NAMENODE_SECONDARY_HTTP_ADDRESS_KEY = "dfs.namenode.secondary.http-address";
|
public static final String DFS_NAMENODE_SECONDARY_HTTP_ADDRESS_KEY = "dfs.namenode.secondary.http-address";
|
||||||
public static final String DFS_NAMENODE_SECONDARY_HTTP_ADDRESS_DEFAULT = "0.0.0.0:50090";
|
public static final String DFS_NAMENODE_SECONDARY_HTTP_ADDRESS_DEFAULT = "0.0.0.0:50090";
|
||||||
|
public static final String DFS_NAMENODE_SECONDARY_HTTPS_PORT_KEY = "dfs.namenode.secondary.https-port";
|
||||||
|
public static final int DFS_NAMENODE_SECONDARY_HTTPS_PORT_DEFAULT = 50490;
|
||||||
public static final String DFS_NAMENODE_CHECKPOINT_CHECK_PERIOD_KEY = "dfs.namenode.checkpoint.check.period";
|
public static final String DFS_NAMENODE_CHECKPOINT_CHECK_PERIOD_KEY = "dfs.namenode.checkpoint.check.period";
|
||||||
public static final long DFS_NAMENODE_CHECKPOINT_CHECK_PERIOD_DEFAULT = 60;
|
public static final long DFS_NAMENODE_CHECKPOINT_CHECK_PERIOD_DEFAULT = 60;
|
||||||
public static final String DFS_NAMENODE_CHECKPOINT_PERIOD_KEY = "dfs.namenode.checkpoint.period";
|
public static final String DFS_NAMENODE_CHECKPOINT_PERIOD_KEY = "dfs.namenode.checkpoint.period";
|
||||||
|
|
|
@ -81,6 +81,7 @@ public class HdfsConfiguration extends Configuration {
|
||||||
deprecate("dfs.safemode.extension", DFSConfigKeys.DFS_NAMENODE_SAFEMODE_EXTENSION_KEY);
|
deprecate("dfs.safemode.extension", DFSConfigKeys.DFS_NAMENODE_SAFEMODE_EXTENSION_KEY);
|
||||||
deprecate("dfs.safemode.threshold.pct", DFSConfigKeys.DFS_NAMENODE_SAFEMODE_THRESHOLD_PCT_KEY);
|
deprecate("dfs.safemode.threshold.pct", DFSConfigKeys.DFS_NAMENODE_SAFEMODE_THRESHOLD_PCT_KEY);
|
||||||
deprecate("dfs.secondary.http.address", DFSConfigKeys.DFS_NAMENODE_SECONDARY_HTTP_ADDRESS_KEY);
|
deprecate("dfs.secondary.http.address", DFSConfigKeys.DFS_NAMENODE_SECONDARY_HTTP_ADDRESS_KEY);
|
||||||
|
deprecate("dfs.secondary.https.port", DFSConfigKeys.DFS_NAMENODE_SECONDARY_HTTPS_PORT_KEY);
|
||||||
deprecate("dfs.socket.timeout", DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY);
|
deprecate("dfs.socket.timeout", DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY);
|
||||||
deprecate("fs.checkpoint.dir", DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_DIR_KEY);
|
deprecate("fs.checkpoint.dir", DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_DIR_KEY);
|
||||||
deprecate("fs.checkpoint.edits.dir", DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_EDITS_DIR_KEY);
|
deprecate("fs.checkpoint.edits.dir", DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_EDITS_DIR_KEY);
|
||||||
|
|
|
@ -51,11 +51,8 @@ import org.apache.hadoop.hdfs.util.DataTransferThrottler;
|
||||||
import org.apache.hadoop.io.IOUtils;
|
import org.apache.hadoop.io.IOUtils;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Performs two types of scanning:
|
* Scans the block files under a block pool and verifies that the
|
||||||
* <li> Gets block files from the data directories and reconciles the
|
* files are not corrupt.
|
||||||
* difference between the blocks on the disk and in memory.</li>
|
|
||||||
* <li> Scans the data directories for block files under a block pool
|
|
||||||
* and verifies that the files are not corrupt</li>
|
|
||||||
* This keeps track of blocks and their last verification times.
|
* This keeps track of blocks and their last verification times.
|
||||||
* Currently it does not modify the metadata for block.
|
* Currently it does not modify the metadata for block.
|
||||||
*/
|
*/
|
||||||
|
@ -430,6 +427,19 @@ class BlockPoolSliceScanner {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// If the block exists, the exception may due to a race with write:
|
||||||
|
// The BlockSender got an old block path in rbw. BlockReceiver removed
|
||||||
|
// the rbw block from rbw to finalized but BlockSender tried to open the
|
||||||
|
// file before BlockReceiver updated the VolumeMap. The state of the
|
||||||
|
// block can be changed again now, so ignore this error here. If there
|
||||||
|
// is a block really deleted by mistake, DirectoryScan should catch it.
|
||||||
|
if (e instanceof FileNotFoundException ) {
|
||||||
|
LOG.info("Verification failed for " + block +
|
||||||
|
". It may be due to race with write.");
|
||||||
|
deleteBlock(block.getLocalBlock());
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
LOG.warn((second ? "Second " : "First ") + "Verification failed for "
|
LOG.warn((second ? "Second " : "First ") + "Verification failed for "
|
||||||
+ block, e);
|
+ block, e);
|
||||||
|
|
||||||
|
|
|
@ -146,6 +146,7 @@ public class NameNode {
|
||||||
DFS_NAMENODE_HTTPS_ADDRESS_KEY,
|
DFS_NAMENODE_HTTPS_ADDRESS_KEY,
|
||||||
DFS_NAMENODE_KEYTAB_FILE_KEY,
|
DFS_NAMENODE_KEYTAB_FILE_KEY,
|
||||||
DFS_NAMENODE_SECONDARY_HTTP_ADDRESS_KEY,
|
DFS_NAMENODE_SECONDARY_HTTP_ADDRESS_KEY,
|
||||||
|
DFS_NAMENODE_SECONDARY_HTTPS_PORT_KEY,
|
||||||
DFS_SECONDARY_NAMENODE_KEYTAB_FILE_KEY,
|
DFS_SECONDARY_NAMENODE_KEYTAB_FILE_KEY,
|
||||||
DFS_NAMENODE_BACKUP_ADDRESS_KEY,
|
DFS_NAMENODE_BACKUP_ADDRESS_KEY,
|
||||||
DFS_NAMENODE_BACKUP_HTTP_ADDRESS_KEY,
|
DFS_NAMENODE_BACKUP_HTTP_ADDRESS_KEY,
|
||||||
|
|
|
@ -254,7 +254,8 @@ public class SecondaryNameNode implements Runnable {
|
||||||
Krb5AndCertsSslSocketConnector.KRB5_CIPHER_SUITES.get(0));
|
Krb5AndCertsSslSocketConnector.KRB5_CIPHER_SUITES.get(0));
|
||||||
InetSocketAddress secInfoSocAddr =
|
InetSocketAddress secInfoSocAddr =
|
||||||
NetUtils.createSocketAddr(infoBindAddress + ":"+ conf.getInt(
|
NetUtils.createSocketAddr(infoBindAddress + ":"+ conf.getInt(
|
||||||
"dfs.secondary.https.port", 443));
|
DFS_NAMENODE_SECONDARY_HTTPS_PORT_KEY,
|
||||||
|
DFS_NAMENODE_SECONDARY_HTTPS_PORT_DEFAULT));
|
||||||
imagePort = secInfoSocAddr.getPort();
|
imagePort = secInfoSocAddr.getPort();
|
||||||
infoServer.addSslListener(secInfoSocAddr, conf, false, true);
|
infoServer.addSslListener(secInfoSocAddr, conf, false, true);
|
||||||
}
|
}
|
||||||
|
|
|
@ -47,12 +47,6 @@ public class TestAppendDifferentChecksum {
|
||||||
public static void setupCluster() throws IOException {
|
public static void setupCluster() throws IOException {
|
||||||
Configuration conf = new HdfsConfiguration();
|
Configuration conf = new HdfsConfiguration();
|
||||||
conf.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 4096);
|
conf.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 4096);
|
||||||
|
|
||||||
// disable block scanner, since otherwise this test can trigger
|
|
||||||
// HDFS-2525, which is a different bug than we're trying to unit test
|
|
||||||
// here! When HDFS-2525 is fixed, this can be removed.
|
|
||||||
conf.setInt(DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY, -1);
|
|
||||||
|
|
||||||
conf.set("fs.hdfs.impl.disable.cache", "true");
|
conf.set("fs.hdfs.impl.disable.cache", "true");
|
||||||
cluster = new MiniDFSCluster.Builder(conf)
|
cluster = new MiniDFSCluster.Builder(conf)
|
||||||
.numDataNodes(1)
|
.numDataNodes(1)
|
||||||
|
|
|
@ -94,6 +94,9 @@ Release 0.23.2 - UNRELEASED
|
||||||
|
|
||||||
IMPROVEMENTS
|
IMPROVEMENTS
|
||||||
|
|
||||||
|
MAPREDUCE-3854. Fixed and reenabled tests related to MR child JVM's
|
||||||
|
environmental variables in TestMiniMRChildTask. (Tom White via vinodkv)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
BUG FIXES
|
BUG FIXES
|
||||||
|
@ -104,6 +107,9 @@ Release 0.23.2 - UNRELEASED
|
||||||
MAPREDUCE-3852. Test TestLinuxResourceCalculatorPlugin failing. (Thomas
|
MAPREDUCE-3852. Test TestLinuxResourceCalculatorPlugin failing. (Thomas
|
||||||
Graves via mahadev)
|
Graves via mahadev)
|
||||||
|
|
||||||
|
MAPREDUCE-3736. Variable substitution depth too large for fs.default.name
|
||||||
|
causes jobs to fail (ahmed via tucu).
|
||||||
|
|
||||||
Release 0.23.1 - 2012-02-08
|
Release 0.23.1 - 2012-02-08
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
@ -812,6 +818,9 @@ Release 0.23.1 - 2012-02-08
|
||||||
MAPREDUCE-3802. Added test to validate that AM can crash multiple times and
|
MAPREDUCE-3802. Added test to validate that AM can crash multiple times and
|
||||||
still can recover successfully after MAPREDUCE-3846. (vinodkv)
|
still can recover successfully after MAPREDUCE-3846. (vinodkv)
|
||||||
|
|
||||||
|
MAPREDUCE-3858. Task attempt failure during commit results in task never completing.
|
||||||
|
(Tom White via mahadev)
|
||||||
|
|
||||||
Release 0.23.0 - 2011-11-01
|
Release 0.23.0 - 2011-11-01
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
|
|
@ -832,6 +832,9 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
|
||||||
public TaskState transition(TaskImpl task, TaskEvent event) {
|
public TaskState transition(TaskImpl task, TaskEvent event) {
|
||||||
task.failedAttempts++;
|
task.failedAttempts++;
|
||||||
TaskTAttemptEvent castEvent = (TaskTAttemptEvent) event;
|
TaskTAttemptEvent castEvent = (TaskTAttemptEvent) event;
|
||||||
|
if (castEvent.getTaskAttemptID().equals(task.commitAttempt)) {
|
||||||
|
task.commitAttempt = null;
|
||||||
|
}
|
||||||
TaskAttempt attempt = task.attempts.get(castEvent.getTaskAttemptID());
|
TaskAttempt attempt = task.attempts.get(castEvent.getTaskAttemptID());
|
||||||
if (attempt.getAssignedContainerMgrAddress() != null) {
|
if (attempt.getAssignedContainerMgrAddress() != null) {
|
||||||
//container was assigned
|
//container was assigned
|
||||||
|
@ -877,6 +880,7 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
|
||||||
|
|
||||||
protected void unSucceed(TaskImpl task) {
|
protected void unSucceed(TaskImpl task) {
|
||||||
++task.numberUncompletedAttempts;
|
++task.numberUncompletedAttempts;
|
||||||
|
task.commitAttempt = null;
|
||||||
task.successfulAttempt = null;
|
task.successfulAttempt = null;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -18,6 +18,8 @@
|
||||||
package org.apache.hadoop.mapreduce.v2.app.job.impl;
|
package org.apache.hadoop.mapreduce.v2.app.job.impl;
|
||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertFalse;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
import static org.mockito.Mockito.mock;
|
import static org.mockito.Mockito.mock;
|
||||||
import static org.mockito.Mockito.when;
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
|
@ -261,6 +263,12 @@ public class TestTaskImpl {
|
||||||
assertTaskRunningState();
|
assertTaskRunningState();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void commitTaskAttempt(TaskAttemptId attemptId) {
|
||||||
|
mockTask.handle(new TaskTAttemptEvent(attemptId,
|
||||||
|
TaskEventType.T_ATTEMPT_COMMIT_PENDING));
|
||||||
|
assertTaskRunningState();
|
||||||
|
}
|
||||||
|
|
||||||
private MockTaskAttemptImpl getLastAttempt() {
|
private MockTaskAttemptImpl getLastAttempt() {
|
||||||
return taskAttempts.get(taskAttempts.size()-1);
|
return taskAttempts.get(taskAttempts.size()-1);
|
||||||
}
|
}
|
||||||
|
@ -279,32 +287,45 @@ public class TestTaskImpl {
|
||||||
assertTaskRunningState();
|
assertTaskRunningState();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void failRunningTaskAttempt(TaskAttemptId attemptId) {
|
||||||
|
mockTask.handle(new TaskTAttemptEvent(attemptId,
|
||||||
|
TaskEventType.T_ATTEMPT_FAILED));
|
||||||
|
assertTaskRunningState();
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* {@link TaskState#NEW}
|
* {@link TaskState#NEW}
|
||||||
*/
|
*/
|
||||||
private void assertTaskNewState() {
|
private void assertTaskNewState() {
|
||||||
assertEquals(mockTask.getState(), TaskState.NEW);
|
assertEquals(TaskState.NEW, mockTask.getState());
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* {@link TaskState#SCHEDULED}
|
* {@link TaskState#SCHEDULED}
|
||||||
*/
|
*/
|
||||||
private void assertTaskScheduledState() {
|
private void assertTaskScheduledState() {
|
||||||
assertEquals(mockTask.getState(), TaskState.SCHEDULED);
|
assertEquals(TaskState.SCHEDULED, mockTask.getState());
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* {@link TaskState#RUNNING}
|
* {@link TaskState#RUNNING}
|
||||||
*/
|
*/
|
||||||
private void assertTaskRunningState() {
|
private void assertTaskRunningState() {
|
||||||
assertEquals(mockTask.getState(), TaskState.RUNNING);
|
assertEquals(TaskState.RUNNING, mockTask.getState());
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* {@link TaskState#KILL_WAIT}
|
* {@link TaskState#KILL_WAIT}
|
||||||
*/
|
*/
|
||||||
private void assertTaskKillWaitState() {
|
private void assertTaskKillWaitState() {
|
||||||
assertEquals(mockTask.getState(), TaskState.KILL_WAIT);
|
assertEquals(TaskState.KILL_WAIT, mockTask.getState());
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* {@link TaskState#SUCCEEDED}
|
||||||
|
*/
|
||||||
|
private void assertTaskSucceededState() {
|
||||||
|
assertEquals(TaskState.SUCCEEDED, mockTask.getState());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -410,4 +431,31 @@ public class TestTaskImpl {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testFailureDuringTaskAttemptCommit() {
|
||||||
|
TaskId taskId = getNewTaskID();
|
||||||
|
scheduleTaskAttempt(taskId);
|
||||||
|
launchTaskAttempt(getLastAttempt().getAttemptId());
|
||||||
|
updateLastAttemptState(TaskAttemptState.COMMIT_PENDING);
|
||||||
|
commitTaskAttempt(getLastAttempt().getAttemptId());
|
||||||
|
|
||||||
|
// During the task attempt commit there is an exception which causes
|
||||||
|
// the attempt to fail
|
||||||
|
updateLastAttemptState(TaskAttemptState.FAILED);
|
||||||
|
failRunningTaskAttempt(getLastAttempt().getAttemptId());
|
||||||
|
|
||||||
|
assertEquals(2, taskAttempts.size());
|
||||||
|
updateLastAttemptState(TaskAttemptState.SUCCEEDED);
|
||||||
|
commitTaskAttempt(getLastAttempt().getAttemptId());
|
||||||
|
mockTask.handle(new TaskTAttemptEvent(getLastAttempt().getAttemptId(),
|
||||||
|
TaskEventType.T_ATTEMPT_SUCCEEDED));
|
||||||
|
|
||||||
|
assertFalse("First attempt should not commit",
|
||||||
|
mockTask.canCommit(taskAttempts.get(0).getAttemptId()));
|
||||||
|
assertTrue("Second attempt should commit",
|
||||||
|
mockTask.canCommit(getLastAttempt().getAttemptId()));
|
||||||
|
|
||||||
|
assertTaskSucceededState();
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -159,7 +159,7 @@ public class TestMRWithDistributedCache extends TestCase {
|
||||||
public void testLocalJobRunner() throws Exception {
|
public void testLocalJobRunner() throws Exception {
|
||||||
Configuration c = new Configuration();
|
Configuration c = new Configuration();
|
||||||
c.set(JTConfig.JT_IPC_ADDRESS, "local");
|
c.set(JTConfig.JT_IPC_ADDRESS, "local");
|
||||||
c.set("fs.default.name", "file:///");
|
c.set("fs.defaultFS", "file:///");
|
||||||
testWithConf(c);
|
testWithConf(c);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -59,7 +59,7 @@ public class TestNoDefaultsJobConf extends HadoopTestCase {
|
||||||
|
|
||||||
JobConf conf = new JobConf(false);
|
JobConf conf = new JobConf(false);
|
||||||
|
|
||||||
conf.set("fs.default.name", createJobConf().get("fs.default.name"));
|
conf.set("fs.defaultFS", createJobConf().get("fs.defaultFS"));
|
||||||
|
|
||||||
conf.setJobName("mr");
|
conf.setJobName("mr");
|
||||||
|
|
||||||
|
|
|
@ -1024,7 +1024,7 @@ public class JHLogAnalyzer {
|
||||||
if(testFile != null) {
|
if(testFile != null) {
|
||||||
LOG.info("Start JHLA test ============ ");
|
LOG.info("Start JHLA test ============ ");
|
||||||
LocalFileSystem lfs = FileSystem.getLocal(conf);
|
LocalFileSystem lfs = FileSystem.getLocal(conf);
|
||||||
conf.set("fs.default.name", "file:///");
|
conf.set("fs.defaultFS", "file:///");
|
||||||
JHLAMapper map = new JHLAMapper(conf);
|
JHLAMapper map = new JHLAMapper(conf);
|
||||||
map.parseLogFile(lfs, new Path(testFile), 0L,
|
map.parseLogFile(lfs, new Path(testFile), 0L,
|
||||||
new LoggingCollector(), Reporter.NULL);
|
new LoggingCollector(), Reporter.NULL);
|
||||||
|
|
|
@ -53,7 +53,7 @@ public class FileBench extends Configured implements Tool {
|
||||||
"unless they are also explicitly included, as in \"-pln -zip\"\n" +
|
"unless they are also explicitly included, as in \"-pln -zip\"\n" +
|
||||||
"Note that CompressionType params only apply to SequenceFiles\n\n" +
|
"Note that CompressionType params only apply to SequenceFiles\n\n" +
|
||||||
"Useful options to set:\n" +
|
"Useful options to set:\n" +
|
||||||
"-D fs.default.name=\"file:///\" \\\n" +
|
"-D fs.defaultFS=\"file:///\" \\\n" +
|
||||||
"-D fs.file.impl=org.apache.hadoop.fs.RawLocalFileSystem \\\n" +
|
"-D fs.file.impl=org.apache.hadoop.fs.RawLocalFileSystem \\\n" +
|
||||||
"-D filebench.file.bytes=$((10*1024*1024*1024)) \\\n" +
|
"-D filebench.file.bytes=$((10*1024*1024*1024)) \\\n" +
|
||||||
"-D filebench.key.words=5 \\\n" +
|
"-D filebench.key.words=5 \\\n" +
|
||||||
|
|
|
@ -41,7 +41,7 @@ public class TestCombineFileInputFormat {
|
||||||
private static FileSystem localFs = null;
|
private static FileSystem localFs = null;
|
||||||
static {
|
static {
|
||||||
try {
|
try {
|
||||||
defaultConf.set("fs.default.name", "file:///");
|
defaultConf.set("fs.defaultFS", "file:///");
|
||||||
localFs = FileSystem.getLocal(defaultConf);
|
localFs = FileSystem.getLocal(defaultConf);
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
throw new RuntimeException("init failure", e);
|
throw new RuntimeException("init failure", e);
|
||||||
|
|
|
@ -69,7 +69,7 @@ public class TestConcatenatedCompressedInput {
|
||||||
|
|
||||||
static {
|
static {
|
||||||
try {
|
try {
|
||||||
defaultConf.set("fs.default.name", "file:///");
|
defaultConf.set("fs.defaultFS", "file:///");
|
||||||
localFs = FileSystem.getLocal(defaultConf);
|
localFs = FileSystem.getLocal(defaultConf);
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
throw new RuntimeException("init failure", e);
|
throw new RuntimeException("init failure", e);
|
||||||
|
|
|
@ -219,8 +219,8 @@ public class TestMiniMRChildTask {
|
||||||
|
|
||||||
// check if X=$X:/abc works for LD_LIBRARY_PATH
|
// check if X=$X:/abc works for LD_LIBRARY_PATH
|
||||||
checkEnv("LD_LIBRARY_PATH", "/tmp", "append");
|
checkEnv("LD_LIBRARY_PATH", "/tmp", "append");
|
||||||
// check if X=/tmp works for an already existing parameter
|
// check if X=y works for an already existing parameter
|
||||||
checkEnv("HOME", "/tmp", "noappend");
|
checkEnv("LANG", "en_us_8859_1", "noappend");
|
||||||
// check if X=/tmp for a new env variable
|
// check if X=/tmp for a new env variable
|
||||||
checkEnv("MY_PATH", "/tmp", "noappend");
|
checkEnv("MY_PATH", "/tmp", "noappend");
|
||||||
// check if X=$X:/tmp works for a new env var and results into :/tmp
|
// check if X=$X:/tmp works for a new env var and results into :/tmp
|
||||||
|
@ -269,8 +269,8 @@ public class TestMiniMRChildTask {
|
||||||
|
|
||||||
// check if X=$X:/abc works for LD_LIBRARY_PATH
|
// check if X=$X:/abc works for LD_LIBRARY_PATH
|
||||||
checkEnv("LD_LIBRARY_PATH", "/tmp", "append");
|
checkEnv("LD_LIBRARY_PATH", "/tmp", "append");
|
||||||
// check if X=/tmp works for an already existing parameter
|
// check if X=y works for an already existing parameter
|
||||||
checkEnv("HOME", "/tmp", "noappend");
|
checkEnv("LANG", "en_us_8859_1", "noappend");
|
||||||
// check if X=/tmp for a new env variable
|
// check if X=/tmp for a new env variable
|
||||||
checkEnv("MY_PATH", "/tmp", "noappend");
|
checkEnv("MY_PATH", "/tmp", "noappend");
|
||||||
// check if X=$X:/tmp works for a new env var and results into :/tmp
|
// check if X=$X:/tmp works for a new env var and results into :/tmp
|
||||||
|
@ -369,7 +369,7 @@ public class TestMiniMRChildTask {
|
||||||
* - x=y (x can be a already existing env variable or a new variable)
|
* - x=y (x can be a already existing env variable or a new variable)
|
||||||
* - x=$x:y (replace $x with the current value of x)
|
* - x=$x:y (replace $x with the current value of x)
|
||||||
*/
|
*/
|
||||||
|
@Test
|
||||||
public void testTaskEnv(){
|
public void testTaskEnv(){
|
||||||
try {
|
try {
|
||||||
JobConf conf = new JobConf(mr.getConfig());
|
JobConf conf = new JobConf(mr.getConfig());
|
||||||
|
@ -392,6 +392,7 @@ public class TestMiniMRChildTask {
|
||||||
* - x=y (x can be a already existing env variable or a new variable)
|
* - x=y (x can be a already existing env variable or a new variable)
|
||||||
* - x=$x:y (replace $x with the current value of x)
|
* - x=$x:y (replace $x with the current value of x)
|
||||||
*/
|
*/
|
||||||
|
@Test
|
||||||
public void testTaskOldEnv(){
|
public void testTaskOldEnv(){
|
||||||
try {
|
try {
|
||||||
JobConf conf = new JobConf(mr.getConfig());
|
JobConf conf = new JobConf(mr.getConfig());
|
||||||
|
@ -415,7 +416,7 @@ public class TestMiniMRChildTask {
|
||||||
EnvCheckMapper.class, EnvCheckReducer.class);
|
EnvCheckMapper.class, EnvCheckReducer.class);
|
||||||
// test
|
// test
|
||||||
// - new SET of new var (MY_PATH)
|
// - new SET of new var (MY_PATH)
|
||||||
// - set of old var (HOME)
|
// - set of old var (LANG)
|
||||||
// - append to an old var from modified env (LD_LIBRARY_PATH)
|
// - append to an old var from modified env (LD_LIBRARY_PATH)
|
||||||
// - append to an old var from tt's env (PATH)
|
// - append to an old var from tt's env (PATH)
|
||||||
// - append to a new var (NEW_PATH)
|
// - append to a new var (NEW_PATH)
|
||||||
|
@ -432,10 +433,10 @@ public class TestMiniMRChildTask {
|
||||||
mapTaskJavaOpts = reduceTaskJavaOpts = TASK_OPTS_VAL;
|
mapTaskJavaOpts = reduceTaskJavaOpts = TASK_OPTS_VAL;
|
||||||
}
|
}
|
||||||
conf.set(mapTaskEnvKey,
|
conf.set(mapTaskEnvKey,
|
||||||
"MY_PATH=/tmp,HOME=/tmp,LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/tmp," +
|
"MY_PATH=/tmp,LANG=en_us_8859_1,LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/tmp," +
|
||||||
"PATH=$PATH:/tmp,NEW_PATH=$NEW_PATH:/tmp");
|
"PATH=$PATH:/tmp,NEW_PATH=$NEW_PATH:/tmp");
|
||||||
conf.set(reduceTaskEnvKey,
|
conf.set(reduceTaskEnvKey,
|
||||||
"MY_PATH=/tmp,HOME=/tmp,LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/tmp," +
|
"MY_PATH=/tmp,LANG=en_us_8859_1,LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/tmp," +
|
||||||
"PATH=$PATH:/tmp,NEW_PATH=$NEW_PATH:/tmp");
|
"PATH=$PATH:/tmp,NEW_PATH=$NEW_PATH:/tmp");
|
||||||
conf.set("path", System.getenv("PATH"));
|
conf.set("path", System.getenv("PATH"));
|
||||||
conf.set(mapTaskJavaOptsKey, mapTaskJavaOpts);
|
conf.set(mapTaskJavaOptsKey, mapTaskJavaOpts);
|
||||||
|
|
|
@ -55,7 +55,7 @@ public class TestTextInputFormat {
|
||||||
private static FileSystem localFs = null;
|
private static FileSystem localFs = null;
|
||||||
static {
|
static {
|
||||||
try {
|
try {
|
||||||
defaultConf.set("fs.default.name", "file:///");
|
defaultConf.set("fs.defaultFS", "file:///");
|
||||||
localFs = FileSystem.getLocal(defaultConf);
|
localFs = FileSystem.getLocal(defaultConf);
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
throw new RuntimeException("init failure", e);
|
throw new RuntimeException("init failure", e);
|
||||||
|
|
|
@ -314,7 +314,7 @@ public class TestMapCollection {
|
||||||
job.setNumReduceTasks(1);
|
job.setNumReduceTasks(1);
|
||||||
job.getConfiguration().set(MRConfig.FRAMEWORK_NAME, MRConfig.LOCAL_FRAMEWORK_NAME);
|
job.getConfiguration().set(MRConfig.FRAMEWORK_NAME, MRConfig.LOCAL_FRAMEWORK_NAME);
|
||||||
job.getConfiguration().setInt(MRJobConfig.IO_SORT_FACTOR, 1000);
|
job.getConfiguration().setInt(MRJobConfig.IO_SORT_FACTOR, 1000);
|
||||||
job.getConfiguration().set("fs.default.name", "file:///");
|
job.getConfiguration().set("fs.defaultFS", "file:///");
|
||||||
job.getConfiguration().setInt("test.mapcollection.num.maps", 1);
|
job.getConfiguration().setInt("test.mapcollection.num.maps", 1);
|
||||||
job.setInputFormatClass(FakeIF.class);
|
job.setInputFormatClass(FakeIF.class);
|
||||||
job.setOutputFormatClass(NullOutputFormat.class);
|
job.setOutputFormatClass(NullOutputFormat.class);
|
||||||
|
|
|
@ -45,9 +45,9 @@ public class TestFileInputFormat {
|
||||||
@Test
|
@Test
|
||||||
public void testAddInputPath() throws IOException {
|
public void testAddInputPath() throws IOException {
|
||||||
final Configuration conf = new Configuration();
|
final Configuration conf = new Configuration();
|
||||||
conf.set("fs.default.name", "s3://abc:xyz@hostname/");
|
conf.set("fs.defaultFS", "s3://abc:xyz@hostname/");
|
||||||
final Job j = Job.getInstance(conf);
|
final Job j = Job.getInstance(conf);
|
||||||
j.getConfiguration().set("fs.default.name", "s3://abc:xyz@hostname/");
|
j.getConfiguration().set("fs.defaultFS", "s3://abc:xyz@hostname/");
|
||||||
|
|
||||||
//setup default fs
|
//setup default fs
|
||||||
final FileSystem defaultfs = FileSystem.get(conf);
|
final FileSystem defaultfs = FileSystem.get(conf);
|
||||||
|
|
|
@ -57,7 +57,7 @@ public class TestMRKeyValueTextInputFormat {
|
||||||
private static FileSystem localFs = null;
|
private static FileSystem localFs = null;
|
||||||
static {
|
static {
|
||||||
try {
|
try {
|
||||||
defaultConf.set("fs.default.name", "file:///");
|
defaultConf.set("fs.defaultFS", "file:///");
|
||||||
localFs = FileSystem.getLocal(defaultConf);
|
localFs = FileSystem.getLocal(defaultConf);
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
throw new RuntimeException("init failure", e);
|
throw new RuntimeException("init failure", e);
|
||||||
|
|
|
@ -457,7 +457,7 @@
|
||||||
|
|
||||||
<property>
|
<property>
|
||||||
<name>mapreduce.job.hdfs-servers</name>
|
<name>mapreduce.job.hdfs-servers</name>
|
||||||
<value>${fs.default.name}</value>
|
<value>${fs.defaultFS}</value>
|
||||||
</property>
|
</property>
|
||||||
|
|
||||||
<!-- WebAppProxy Configuration-->
|
<!-- WebAppProxy Configuration-->
|
||||||
|
|
Loading…
Reference in New Issue