svn merge -c 1372707 from trunk for HDFS-3658. Fix bugs in TestDFSClientRetries and add more tests. (szetszwo)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1372718 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
080455d1cc
commit
4a84c9020d
|
@ -434,6 +434,8 @@ Release 2.0.1-alpha - UNRELEASED
|
||||||
HDFS-3790. test_fuse_dfs.c doesn't compile on centos 5. (Colin Patrick
|
HDFS-3790. test_fuse_dfs.c doesn't compile on centos 5. (Colin Patrick
|
||||||
McCabe via atm)
|
McCabe via atm)
|
||||||
|
|
||||||
|
HDFS-3658. Fix bugs in TestDFSClientRetries and add more tests. (szetszwo)
|
||||||
|
|
||||||
BREAKDOWN OF HDFS-3042 SUBTASKS
|
BREAKDOWN OF HDFS-3042 SUBTASKS
|
||||||
|
|
||||||
HDFS-2185. HDFS portion of ZK-based FailoverController (todd)
|
HDFS-2185. HDFS portion of ZK-based FailoverController (todd)
|
||||||
|
|
|
@ -3727,6 +3727,11 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
|
||||||
this.extension = conf.getInt(DFS_NAMENODE_SAFEMODE_EXTENSION_KEY, 0);
|
this.extension = conf.getInt(DFS_NAMENODE_SAFEMODE_EXTENSION_KEY, 0);
|
||||||
this.safeReplication = conf.getInt(DFS_NAMENODE_REPLICATION_MIN_KEY,
|
this.safeReplication = conf.getInt(DFS_NAMENODE_REPLICATION_MIN_KEY,
|
||||||
DFS_NAMENODE_REPLICATION_MIN_DEFAULT);
|
DFS_NAMENODE_REPLICATION_MIN_DEFAULT);
|
||||||
|
|
||||||
|
LOG.info(DFS_NAMENODE_SAFEMODE_THRESHOLD_PCT_KEY + " = " + threshold);
|
||||||
|
LOG.info(DFS_NAMENODE_SAFEMODE_MIN_DATANODES_KEY + " = " + datanodeThreshold);
|
||||||
|
LOG.info(DFS_NAMENODE_SAFEMODE_EXTENSION_KEY + " = " + extension);
|
||||||
|
|
||||||
// default to safe mode threshold (i.e., don't populate queues before leaving safe mode)
|
// default to safe mode threshold (i.e., don't populate queues before leaving safe mode)
|
||||||
this.replQueueThreshold =
|
this.replQueueThreshold =
|
||||||
conf.getFloat(DFS_NAMENODE_REPL_QUEUE_THRESHOLD_PCT_KEY,
|
conf.getFloat(DFS_NAMENODE_REPL_QUEUE_THRESHOLD_PCT_KEY,
|
||||||
|
|
|
@ -118,6 +118,8 @@ public class MiniDFSCluster {
|
||||||
public static final String PROP_TEST_BUILD_DATA = "test.build.data";
|
public static final String PROP_TEST_BUILD_DATA = "test.build.data";
|
||||||
/** Configuration option to set the data dir: {@value} */
|
/** Configuration option to set the data dir: {@value} */
|
||||||
public static final String HDFS_MINIDFS_BASEDIR = "hdfs.minidfs.basedir";
|
public static final String HDFS_MINIDFS_BASEDIR = "hdfs.minidfs.basedir";
|
||||||
|
public static final String DFS_NAMENODE_SAFEMODE_EXTENSION_TESTING_KEY
|
||||||
|
= DFS_NAMENODE_SAFEMODE_EXTENSION_KEY + ".testing";
|
||||||
|
|
||||||
static { DefaultMetricsSystem.setMiniClusterMode(true); }
|
static { DefaultMetricsSystem.setMiniClusterMode(true); }
|
||||||
|
|
||||||
|
@ -574,7 +576,9 @@ public class MiniDFSCluster {
|
||||||
|
|
||||||
int replication = conf.getInt(DFS_REPLICATION_KEY, 3);
|
int replication = conf.getInt(DFS_REPLICATION_KEY, 3);
|
||||||
conf.setInt(DFS_REPLICATION_KEY, Math.min(replication, numDataNodes));
|
conf.setInt(DFS_REPLICATION_KEY, Math.min(replication, numDataNodes));
|
||||||
conf.setInt(DFS_NAMENODE_SAFEMODE_EXTENSION_KEY, 0);
|
int safemodeExtension = conf.getInt(
|
||||||
|
DFS_NAMENODE_SAFEMODE_EXTENSION_TESTING_KEY, 0);
|
||||||
|
conf.setInt(DFS_NAMENODE_SAFEMODE_EXTENSION_KEY, safemodeExtension);
|
||||||
conf.setInt(DFS_NAMENODE_DECOMMISSION_INTERVAL_KEY, 3); // 3 second
|
conf.setInt(DFS_NAMENODE_DECOMMISSION_INTERVAL_KEY, 3); // 3 second
|
||||||
conf.setClass(NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
|
conf.setClass(NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
|
||||||
StaticMapping.class, DNSToSwitchMapping.class);
|
StaticMapping.class, DNSToSwitchMapping.class);
|
||||||
|
|
|
@ -41,6 +41,7 @@ import java.security.MessageDigest;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.Random;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
|
@ -79,8 +80,10 @@ import org.apache.hadoop.ipc.Server;
|
||||||
import org.apache.hadoop.net.NetUtils;
|
import org.apache.hadoop.net.NetUtils;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.hadoop.test.GenericTestUtils;
|
import org.apache.hadoop.test.GenericTestUtils;
|
||||||
|
import org.apache.hadoop.util.StringUtils;
|
||||||
import org.apache.hadoop.util.Time;
|
import org.apache.hadoop.util.Time;
|
||||||
import org.apache.log4j.Level;
|
import org.apache.log4j.Level;
|
||||||
|
import org.junit.Assert;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.mockito.Mockito;
|
import org.mockito.Mockito;
|
||||||
import org.mockito.internal.stubbing.answers.ThrowsException;
|
import org.mockito.internal.stubbing.answers.ThrowsException;
|
||||||
|
@ -789,6 +792,8 @@ public class TestDFSClientRetries {
|
||||||
final Path dir = new Path("/testNamenodeRestart");
|
final Path dir = new Path("/testNamenodeRestart");
|
||||||
|
|
||||||
conf.setBoolean(DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_ENABLED_KEY, true);
|
conf.setBoolean(DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_ENABLED_KEY, true);
|
||||||
|
conf.setInt(DFSConfigKeys.DFS_NAMENODE_SAFEMODE_MIN_DATANODES_KEY, 1);
|
||||||
|
conf.setInt(MiniDFSCluster.DFS_NAMENODE_SAFEMODE_EXTENSION_TESTING_KEY, 5000);
|
||||||
|
|
||||||
final short numDatanodes = 3;
|
final short numDatanodes = 3;
|
||||||
final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
|
final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
|
||||||
|
@ -811,11 +816,38 @@ public class TestDFSClientRetries {
|
||||||
final FileStatus s1 = fs.getFileStatus(file1);
|
final FileStatus s1 = fs.getFileStatus(file1);
|
||||||
assertEquals(length, s1.getLen());
|
assertEquals(length, s1.getLen());
|
||||||
|
|
||||||
|
//create file4, write some data but not close
|
||||||
|
final Path file4 = new Path(dir, "file4");
|
||||||
|
final FSDataOutputStream out4 = fs.create(file4, false, 4096,
|
||||||
|
fs.getDefaultReplication(file4), 1024L, null);
|
||||||
|
final byte[] bytes = new byte[1000];
|
||||||
|
new Random().nextBytes(bytes);
|
||||||
|
out4.write(bytes);
|
||||||
|
out4.write(bytes);
|
||||||
|
out4.hflush();
|
||||||
|
|
||||||
//shutdown namenode
|
//shutdown namenode
|
||||||
assertTrue(HdfsUtils.isHealthy(uri));
|
assertTrue(HdfsUtils.isHealthy(uri));
|
||||||
cluster.shutdownNameNode(0);
|
cluster.shutdownNameNode(0);
|
||||||
assertFalse(HdfsUtils.isHealthy(uri));
|
assertFalse(HdfsUtils.isHealthy(uri));
|
||||||
|
|
||||||
|
//namenode is down, continue writing file4 in a thread
|
||||||
|
final Thread file4thread = new Thread(new Runnable() {
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
try {
|
||||||
|
//write some more data and then close the file
|
||||||
|
out4.write(bytes);
|
||||||
|
out4.write(bytes);
|
||||||
|
out4.write(bytes);
|
||||||
|
out4.close();
|
||||||
|
} catch (Exception e) {
|
||||||
|
exceptions.add(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
file4thread.start();
|
||||||
|
|
||||||
//namenode is down, read the file in a thread
|
//namenode is down, read the file in a thread
|
||||||
final Thread reader = new Thread(new Runnable() {
|
final Thread reader = new Thread(new Runnable() {
|
||||||
@Override
|
@Override
|
||||||
|
@ -874,10 +906,26 @@ public class TestDFSClientRetries {
|
||||||
|
|
||||||
//check file1 and file3
|
//check file1 and file3
|
||||||
thread.join();
|
thread.join();
|
||||||
|
assertEmpty(exceptions);
|
||||||
assertEquals(s1.getLen(), fs.getFileStatus(file3).getLen());
|
assertEquals(s1.getLen(), fs.getFileStatus(file3).getLen());
|
||||||
assertEquals(fs.getFileChecksum(file1), fs.getFileChecksum(file3));
|
assertEquals(fs.getFileChecksum(file1), fs.getFileChecksum(file3));
|
||||||
|
|
||||||
reader.join();
|
reader.join();
|
||||||
|
assertEmpty(exceptions);
|
||||||
|
|
||||||
|
//check file4
|
||||||
|
file4thread.join();
|
||||||
|
assertEmpty(exceptions);
|
||||||
|
{
|
||||||
|
final FSDataInputStream in = fs.open(file4);
|
||||||
|
int count = 0;
|
||||||
|
for(int r; (r = in.read()) != -1; count++) {
|
||||||
|
Assert.assertEquals(String.format("count=%d", count),
|
||||||
|
bytes[count % bytes.length], (byte)r);
|
||||||
|
}
|
||||||
|
Assert.assertEquals(5 * bytes.length, count);
|
||||||
|
in.close();
|
||||||
|
}
|
||||||
|
|
||||||
//enter safe mode
|
//enter safe mode
|
||||||
assertTrue(HdfsUtils.isHealthy(uri));
|
assertTrue(HdfsUtils.isHealthy(uri));
|
||||||
|
@ -917,18 +965,27 @@ public class TestDFSClientRetries {
|
||||||
LOG.info("GOOD!", fnfe);
|
LOG.info("GOOD!", fnfe);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!exceptions.isEmpty()) {
|
assertEmpty(exceptions);
|
||||||
LOG.error("There are " + exceptions.size() + " exception(s):");
|
|
||||||
for(int i = 0; i < exceptions.size(); i++) {
|
|
||||||
LOG.error("Exception " + i, exceptions.get(i));
|
|
||||||
}
|
|
||||||
fail();
|
|
||||||
}
|
|
||||||
} finally {
|
} finally {
|
||||||
cluster.shutdown();
|
cluster.shutdown();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void assertEmpty(final List<Exception> exceptions) {
|
||||||
|
if (!exceptions.isEmpty()) {
|
||||||
|
final StringBuilder b = new StringBuilder("There are ")
|
||||||
|
.append(exceptions.size())
|
||||||
|
.append(" exception(s):");
|
||||||
|
for(int i = 0; i < exceptions.size(); i++) {
|
||||||
|
b.append("\n Exception ")
|
||||||
|
.append(i)
|
||||||
|
.append(": ")
|
||||||
|
.append(StringUtils.stringifyException(exceptions.get(i)));
|
||||||
|
}
|
||||||
|
fail(b.toString());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private static FileSystem createFsWithDifferentUsername(
|
private static FileSystem createFsWithDifferentUsername(
|
||||||
final Configuration conf, final boolean isWebHDFS
|
final Configuration conf, final boolean isWebHDFS
|
||||||
) throws IOException, InterruptedException {
|
) throws IOException, InterruptedException {
|
||||||
|
|
Loading…
Reference in New Issue