HDFS-3910. DFSTestUtil#waitReplication should timeout. Contributed by Eli Collins

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1383618 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Eli Collins 2012-09-11 21:24:12 +00:00
parent de2efbe3ba
commit dc1fd84bd1
21 changed files with 110 additions and 75 deletions

View File

@ -466,6 +466,8 @@ Release 2.0.2-alpha - 2012-09-07
HDFS-3907. Allow multiple users for local block readers. (eli) HDFS-3907. Allow multiple users for local block readers. (eli)
HDFS-3910. DFSTestUtil#waitReplication should timeout. (eli)
OPTIMIZATIONS OPTIMIZATIONS
HDFS-2982. Startup performance suffers when there are many edit log HDFS-2982. Startup performance suffers when there are many edit log

View File

@ -273,7 +273,7 @@ void setReplication(FileSystem fs, String topdir, short value)
* specified target. * specified target.
*/ */
public void waitReplication(FileSystem fs, String topdir, short value) public void waitReplication(FileSystem fs, String topdir, short value)
throws IOException { throws IOException, InterruptedException, TimeoutException {
Path root = new Path(topdir); Path root = new Path(topdir);
/** wait for the replication factor to settle down */ /** wait for the replication factor to settle down */
@ -498,36 +498,44 @@ public String[] getFileNames(String topDir) {
return fileNames; return fileNames;
} }
} }
/** wait for the file's replication to be done */ /**
public static void waitReplication(FileSystem fs, Path fileName, * Wait for the given file to reach the given replication factor.
short replFactor) throws IOException { * @throws TimeoutException if we fail to sufficiently replicate the file
boolean good; */
public static void waitReplication(FileSystem fs, Path fileName, short replFactor)
throws IOException, InterruptedException, TimeoutException {
boolean correctReplFactor;
final int ATTEMPTS = 20;
int count = 0;
do { do {
good = true; correctReplFactor = true;
BlockLocation locs[] = fs.getFileBlockLocations( BlockLocation locs[] = fs.getFileBlockLocations(
fs.getFileStatus(fileName), 0, Long.MAX_VALUE); fs.getFileStatus(fileName), 0, Long.MAX_VALUE);
count++;
for (int j = 0; j < locs.length; j++) { for (int j = 0; j < locs.length; j++) {
String[] hostnames = locs[j].getNames(); String[] hostnames = locs[j].getNames();
if (hostnames.length != replFactor) { if (hostnames.length != replFactor) {
String hostNameList = ""; correctReplFactor = false;
for (String h : hostnames) hostNameList += h + " "; System.out.println("Block " + j + " of file " + fileName
System.out.println("Block " + j + " of file " + fileName + " has replication factor " + hostnames.length
+ " has replication factor " + hostnames.length + "; locations " + " (desired " + replFactor + "); locations "
+ hostNameList); + Joiner.on(' ').join(hostnames));
good = false; Thread.sleep(1000);
try {
System.out.println("Waiting for replication factor to drain");
Thread.sleep(100);
} catch (InterruptedException e) {}
break; break;
} }
} }
if (good) { if (correctReplFactor) {
System.out.println("All blocks of file " + fileName System.out.println("All blocks of file " + fileName
+ " verified to have replication factor " + replFactor); + " verified to have replication factor " + replFactor);
} }
} while(!good); } while (!correctReplFactor && count < ATTEMPTS);
if (count == ATTEMPTS) {
throw new TimeoutException("Timed out waiting for " + fileName +
" to reach " + replFactor + " replicas");
}
} }
/** delete directory and everything underneath it.*/ /** delete directory and everything underneath it.*/

View File

@ -61,7 +61,7 @@ public static void teardownCluster() {
* of this class might immediately issue a retry on failure, so it's polite. * of this class might immediately issue a retry on failure, so it's polite.
*/ */
@Test @Test
public void testStablePositionAfterCorruptRead() throws IOException { public void testStablePositionAfterCorruptRead() throws Exception {
final short REPL_FACTOR = 1; final short REPL_FACTOR = 1;
final long FILE_LENGTH = 512L; final long FILE_LENGTH = 512L;
cluster.waitActive(); cluster.waitActive();

View File

@ -24,6 +24,7 @@
import java.io.PrintStream; import java.io.PrintStream;
import java.io.RandomAccessFile; import java.io.RandomAccessFile;
import java.util.Random; import java.util.Random;
import java.util.concurrent.TimeoutException;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -199,11 +200,11 @@ public void testCorruptTwoOutOfThreeReplicas() throws Exception {
} }
/** /**
* create a file with one block and corrupt some/all of the block replicas. * Create a file with one block and corrupt some/all of the block replicas.
*/ */
private void createAFileWithCorruptedBlockReplicas(Path filePath, short repl, private void createAFileWithCorruptedBlockReplicas(Path filePath, short repl,
int corruptBlockCount) throws IOException, AccessControlException, int corruptBlockCount) throws IOException, AccessControlException,
FileNotFoundException, UnresolvedLinkException { FileNotFoundException, UnresolvedLinkException, InterruptedException, TimeoutException {
DFSTestUtil.createFile(dfs, filePath, BLOCK_SIZE, repl, 0); DFSTestUtil.createFile(dfs, filePath, BLOCK_SIZE, repl, 0);
DFSTestUtil.waitReplication(dfs, filePath, repl); DFSTestUtil.waitReplication(dfs, filePath, repl);
// Locate the file blocks by asking name node // Locate the file blocks by asking name node

View File

@ -789,8 +789,7 @@ public void testClientDNProtocolTimeout() throws IOException {
* way. See HDFS-3067. * way. See HDFS-3067.
*/ */
@Test @Test
public void testRetryOnChecksumFailure() public void testRetryOnChecksumFailure() throws Exception {
throws UnresolvedLinkException, IOException {
HdfsConfiguration conf = new HdfsConfiguration(); HdfsConfiguration conf = new HdfsConfiguration();
MiniDFSCluster cluster = MiniDFSCluster cluster =
new MiniDFSCluster.Builder(conf).numDataNodes(1).build(); new MiniDFSCluster.Builder(conf).numDataNodes(1).build();

View File

@ -173,7 +173,7 @@ public static boolean corruptReplica(ExtendedBlock blk, int replica) throws IOEx
} }
@Test @Test
public void testBlockCorruptionPolicy() throws IOException { public void testBlockCorruptionPolicy() throws Exception {
Configuration conf = new HdfsConfiguration(); Configuration conf = new HdfsConfiguration();
conf.setLong(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, 1000L); conf.setLong(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, 1000L);
Random random = new Random(); Random random = new Random();

View File

@ -25,6 +25,7 @@
import java.io.FileNotFoundException; import java.io.FileNotFoundException;
import java.io.IOException; import java.io.IOException;
import java.util.Random; import java.util.Random;
import java.util.concurrent.TimeoutException;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.commons.logging.impl.Log4JLogger; import org.apache.commons.logging.impl.Log4JLogger;
@ -100,7 +101,7 @@ private static void writeFile(FileSystem fileSys, Path name, int repl,
} }
private void checkFile(FileSystem fileSys, Path name, int repl) private void checkFile(FileSystem fileSys, Path name, int repl)
throws IOException { throws IOException, InterruptedException, TimeoutException {
DFSTestUtil.waitReplication(fileSys, name, (short) repl); DFSTestUtil.waitReplication(fileSys, name, (short) repl);
} }
@ -129,7 +130,7 @@ public void testGetFileInfo() throws IOException {
/** Test the FileStatus obtained calling getFileStatus on a file */ /** Test the FileStatus obtained calling getFileStatus on a file */
@Test @Test
public void testGetFileStatusOnFile() throws IOException { public void testGetFileStatusOnFile() throws Exception {
checkFile(fs, file1, 1); checkFile(fs, file1, 1);
// test getFileStatus on a file // test getFileStatus on a file
FileStatus status = fs.getFileStatus(file1); FileStatus status = fs.getFileStatus(file1);

View File

@ -27,6 +27,7 @@
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.util.Iterator; import java.util.Iterator;
import java.util.Random; import java.util.Random;
import java.util.concurrent.TimeoutException;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -420,8 +421,8 @@ public void testReplicateLenMismatchedBlock() throws Exception {
} }
} }
private void changeBlockLen(MiniDFSCluster cluster, private void changeBlockLen(MiniDFSCluster cluster, int lenDelta)
int lenDelta) throws IOException, InterruptedException { throws IOException, InterruptedException, TimeoutException {
final Path fileName = new Path("/file1"); final Path fileName = new Path("/file1");
final short REPLICATION_FACTOR = (short)1; final short REPLICATION_FACTOR = (short)1;
final FileSystem fs = cluster.getFileSystem(); final FileSystem fs = cluster.getFileSystem();

View File

@ -88,7 +88,7 @@ static void initConf(Configuration conf) {
/* create a file with a length of <code>fileLen</code> */ /* create a file with a length of <code>fileLen</code> */
static void createFile(MiniDFSCluster cluster, Path filePath, long fileLen, static void createFile(MiniDFSCluster cluster, Path filePath, long fileLen,
short replicationFactor, int nnIndex) short replicationFactor, int nnIndex)
throws IOException { throws IOException, InterruptedException, TimeoutException {
FileSystem fs = cluster.getFileSystem(nnIndex); FileSystem fs = cluster.getFileSystem(nnIndex);
DFSTestUtil.createFile(fs, filePath, fileLen, DFSTestUtil.createFile(fs, filePath, fileLen,
replicationFactor, r.nextLong()); replicationFactor, r.nextLong());
@ -100,7 +100,7 @@ static void createFile(MiniDFSCluster cluster, Path filePath, long fileLen,
* whose used space to be <code>size</code> * whose used space to be <code>size</code>
*/ */
private ExtendedBlock[] generateBlocks(Configuration conf, long size, private ExtendedBlock[] generateBlocks(Configuration conf, long size,
short numNodes) throws IOException { short numNodes) throws IOException, InterruptedException, TimeoutException {
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numNodes).build(); cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numNodes).build();
try { try {
cluster.waitActive(); cluster.waitActive();

View File

@ -23,6 +23,7 @@
import java.util.Collection; import java.util.Collection;
import java.util.List; import java.util.List;
import java.util.Random; import java.util.Random;
import java.util.concurrent.TimeoutException;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -96,7 +97,7 @@ private static class Suite {
/* create a file with a length of <code>fileLen</code> */ /* create a file with a length of <code>fileLen</code> */
private static void createFile(Suite s, int index, long len private static void createFile(Suite s, int index, long len
) throws IOException { ) throws IOException, InterruptedException, TimeoutException {
final FileSystem fs = s.cluster.getFileSystem(index); final FileSystem fs = s.cluster.getFileSystem(index);
DFSTestUtil.createFile(fs, FILE_PATH, len, s.replication, RANDOM.nextLong()); DFSTestUtil.createFile(fs, FILE_PATH, len, s.replication, RANDOM.nextLong());
DFSTestUtil.waitReplication(fs, FILE_PATH, s.replication); DFSTestUtil.waitReplication(fs, FILE_PATH, s.replication);
@ -106,7 +107,7 @@ private static void createFile(Suite s, int index, long len
* whose used space to be <code>size</code> * whose used space to be <code>size</code>
*/ */
private static ExtendedBlock[][] generateBlocks(Suite s, long size private static ExtendedBlock[][] generateBlocks(Suite s, long size
) throws IOException { ) throws IOException, InterruptedException, TimeoutException {
final ExtendedBlock[][] blocks = new ExtendedBlock[s.clients.length][]; final ExtendedBlock[][] blocks = new ExtendedBlock[s.clients.length][];
for(int n = 0; n < s.clients.length; n++) { for(int n = 0; n < s.clients.length; n++) {
final long fileLen = size/s.replication; final long fileLen = size/s.replication;

View File

@ -53,7 +53,7 @@ public class TestOverReplicatedBlocks {
* corrupt ones. * corrupt ones.
*/ */
@Test @Test
public void testProcesOverReplicateBlock() throws IOException { public void testProcesOverReplicateBlock() throws Exception {
Configuration conf = new HdfsConfiguration(); Configuration conf = new HdfsConfiguration();
conf.setLong(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, 1000L); conf.setLong(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, 1000L);
conf.set( conf.set(
@ -141,7 +141,7 @@ public void testProcesOverReplicateBlock() throws IOException {
* send heartbeats. * send heartbeats.
*/ */
@Test @Test
public void testChooseReplicaToDelete() throws IOException { public void testChooseReplicaToDelete() throws Exception {
MiniDFSCluster cluster = null; MiniDFSCluster cluster = null;
FileSystem fs = null; FileSystem fs = null;
try { try {

View File

@ -89,7 +89,7 @@ public void testThrottler() throws IOException {
} }
@Test @Test
public void testBlockReplacement() throws IOException, TimeoutException { public void testBlockReplacement() throws Exception {
final Configuration CONF = new HdfsConfiguration(); final Configuration CONF = new HdfsConfiguration();
final String[] INITIAL_RACKS = {"/RACK0", "/RACK1", "/RACK2"}; final String[] INITIAL_RACKS = {"/RACK0", "/RACK1", "/RACK2"};
final String[] NEW_RACKS = {"/RACK2"}; final String[] NEW_RACKS = {"/RACK2"};

View File

@ -27,6 +27,9 @@
import java.util.List; import java.util.List;
import java.util.Random; import java.util.Random;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeoutException;
import junit.framework.Assert;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -65,7 +68,7 @@
/** /**
* This test simulates a variety of situations when blocks are being * This test simulates a variety of situations when blocks are being
* intentionally orrupted, unexpectedly modified, and so on before a block * intentionally corrupted, unexpectedly modified, and so on before a block
* report is happening * report is happening
*/ */
public class TestBlockReport { public class TestBlockReport {
@ -316,7 +319,7 @@ public void blockReport_05() {
* @throws IOException in case of an error * @throws IOException in case of an error
*/ */
@Test @Test
public void blockReport_06() throws IOException { public void blockReport_06() throws Exception {
final String METHOD_NAME = GenericTestUtils.getMethodName(); final String METHOD_NAME = GenericTestUtils.getMethodName();
Path filePath = new Path("/" + METHOD_NAME + ".dat"); Path filePath = new Path("/" + METHOD_NAME + ".dat");
final int DN_N1 = DN_N0 + 1; final int DN_N1 = DN_N0 + 1;
@ -353,7 +356,7 @@ public void blockReport_06() throws IOException {
@Test @Test
// Currently this test is failing as expected 'cause the correct behavior is // Currently this test is failing as expected 'cause the correct behavior is
// not yet implemented (9/15/09) // not yet implemented (9/15/09)
public void blockReport_07() throws IOException { public void blockReport_07() throws Exception {
final String METHOD_NAME = GenericTestUtils.getMethodName(); final String METHOD_NAME = GenericTestUtils.getMethodName();
Path filePath = new Path("/" + METHOD_NAME + ".dat"); Path filePath = new Path("/" + METHOD_NAME + ".dat");
final int DN_N1 = DN_N0 + 1; final int DN_N1 = DN_N0 + 1;
@ -670,21 +673,24 @@ private ArrayList<Block> writeFile(final String METHOD_NAME,
} }
private void startDNandWait(Path filePath, boolean waitReplicas) private void startDNandWait(Path filePath, boolean waitReplicas)
throws IOException { throws IOException, InterruptedException, TimeoutException {
if(LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("Before next DN start: " + cluster.getDataNodes().size()); LOG.debug("Before next DN start: " + cluster.getDataNodes().size());
} }
cluster.startDataNodes(conf, 1, true, null, null); cluster.startDataNodes(conf, 1, true, null, null);
cluster.waitClusterUp();
ArrayList<DataNode> datanodes = cluster.getDataNodes(); ArrayList<DataNode> datanodes = cluster.getDataNodes();
assertEquals(datanodes.size(), 2); assertEquals(datanodes.size(), 2);
if(LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
int lastDn = datanodes.size() - 1; int lastDn = datanodes.size() - 1;
LOG.debug("New datanode " LOG.debug("New datanode "
+ cluster.getDataNodes().get(lastDn).getDisplayName() + cluster.getDataNodes().get(lastDn).getDisplayName()
+ " has been started"); + " has been started");
} }
if (waitReplicas) DFSTestUtil.waitReplication(fs, filePath, REPL_FACTOR); if (waitReplicas) {
DFSTestUtil.waitReplication(fs, filePath, REPL_FACTOR);
}
} }
private ArrayList<Block> prepareForRide(final Path filePath, private ArrayList<Block> prepareForRide(final Path filePath,
@ -836,8 +842,9 @@ public BlockChecker(final Path filePath) {
public void run() { public void run() {
try { try {
startDNandWait(filePath, true); startDNandWait(filePath, true);
} catch (IOException e) { } catch (Exception e) {
LOG.warn("Shouldn't happen", e); e.printStackTrace();
Assert.fail("Failed to start BlockChecker: " + e);
} }
} }
} }

View File

@ -105,7 +105,7 @@ public void tearDown() throws Exception {
* failure if the configuration parameter allows this. * failure if the configuration parameter allows this.
*/ */
@Test @Test
public void testVolumeFailure() throws IOException { public void testVolumeFailure() throws Exception {
FileSystem fs = cluster.getFileSystem(); FileSystem fs = cluster.getFileSystem();
dataDir = new File(cluster.getDataDirectory()); dataDir = new File(cluster.getDataDirectory());
System.out.println("Data dir: is " + dataDir.getPath()); System.out.println("Data dir: is " + dataDir.getPath());

View File

@ -137,7 +137,7 @@ private void testRbwReplicas(MiniDFSCluster cluster, boolean isCorrupt)
} }
// test recovering unlinked tmp replicas // test recovering unlinked tmp replicas
@Test public void testRecoverReplicas() throws IOException { @Test public void testRecoverReplicas() throws Exception {
Configuration conf = new HdfsConfiguration(); Configuration conf = new HdfsConfiguration();
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 1024L); conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 1024L);
conf.setInt(DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY, 512); conf.setInt(DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY, 512);

View File

@ -116,7 +116,7 @@ public void testDisplayRecentEditLogOpCodes() throws IOException {
* automatically bumped up to the new minimum upon restart. * automatically bumped up to the new minimum upon restart.
*/ */
@Test @Test
public void testReplicationAdjusted() throws IOException { public void testReplicationAdjusted() throws Exception {
// start a cluster // start a cluster
Configuration conf = new HdfsConfiguration(); Configuration conf = new HdfsConfiguration();
// Replicate and heartbeat fast to shave a few seconds off test // Replicate and heartbeat fast to shave a few seconds off test

View File

@ -53,7 +53,7 @@ public class TestProcessCorruptBlocks {
* replicas (2) is equal to replication factor (2)) * replicas (2) is equal to replication factor (2))
*/ */
@Test @Test
public void testWhenDecreasingReplication() throws IOException { public void testWhenDecreasingReplication() throws Exception {
Configuration conf = new HdfsConfiguration(); Configuration conf = new HdfsConfiguration();
conf.setLong(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, 1000L); conf.setLong(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, 1000L);
conf.set(DFSConfigKeys.DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_KEY, Integer.toString(2)); conf.set(DFSConfigKeys.DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_KEY, Integer.toString(2));
@ -108,7 +108,7 @@ public void testWhenDecreasingReplication() throws IOException {
* *
*/ */
@Test @Test
public void testByAddingAnExtraDataNode() throws IOException { public void testByAddingAnExtraDataNode() throws Exception {
Configuration conf = new HdfsConfiguration(); Configuration conf = new HdfsConfiguration();
conf.setLong(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, 1000L); conf.setLong(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, 1000L);
conf.set(DFSConfigKeys.DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_KEY, Integer.toString(2)); conf.set(DFSConfigKeys.DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_KEY, Integer.toString(2));
@ -159,7 +159,7 @@ public void testByAddingAnExtraDataNode() throws IOException {
* replicas (1) is equal to replication factor (1)) * replicas (1) is equal to replication factor (1))
*/ */
@Test @Test
public void testWithReplicationFactorAsOne() throws IOException { public void testWithReplicationFactorAsOne() throws Exception {
Configuration conf = new HdfsConfiguration(); Configuration conf = new HdfsConfiguration();
conf.setLong(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, 1000L); conf.setLong(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, 1000L);
conf.set(DFSConfigKeys.DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_KEY, Integer.toString(2)); conf.set(DFSConfigKeys.DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_KEY, Integer.toString(2));
@ -208,7 +208,7 @@ public void testWithReplicationFactorAsOne() throws IOException {
* Verify that all replicas are corrupt and 3 replicas are present. * Verify that all replicas are corrupt and 3 replicas are present.
*/ */
@Test @Test
public void testWithAllCorruptReplicas() throws IOException { public void testWithAllCorruptReplicas() throws Exception {
Configuration conf = new HdfsConfiguration(); Configuration conf = new HdfsConfiguration();
conf.setLong(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, 1000L); conf.setLong(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, 1000L);
conf.set(DFSConfigKeys.DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_KEY, Integer.toString(2)); conf.set(DFSConfigKeys.DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_KEY, Integer.toString(2));

View File

@ -23,6 +23,7 @@
import java.io.DataOutputStream; import java.io.DataOutputStream;
import java.io.IOException; import java.io.IOException;
import java.util.concurrent.TimeoutException;
import junit.framework.TestCase; import junit.framework.TestCase;
@ -95,7 +96,7 @@ public void testLocality() throws Exception {
} }
private void createInputs(FileSystem fs, Path inDir, String fileName) private void createInputs(FileSystem fs, Path inDir, String fileName)
throws IOException { throws IOException, TimeoutException, InterruptedException {
// create a multi-block file on hdfs // create a multi-block file on hdfs
Path path = new Path(inDir, fileName); Path path = new Path(inDir, fileName);
final short replication = 2; final short replication = 2;
@ -157,7 +158,7 @@ public RecordReader<Text, Text> getRecordReader(InputSplit split,
} }
} }
public void testMultiLevelInput() throws IOException { public void testMultiLevelInput() throws Exception {
JobConf job = new JobConf(conf); JobConf job = new JobConf(conf);
job.setBoolean("dfs.replication.considerLoad", false); job.setBoolean("dfs.replication.considerLoad", false);
@ -291,7 +292,8 @@ private BlockLocation[] mockBlockLocations(long size, long splitSize) {
} }
static void writeFile(Configuration conf, Path name, static void writeFile(Configuration conf, Path name,
short replication, int numBlocks) throws IOException { short replication, int numBlocks)
throws IOException, TimeoutException, InterruptedException {
FileSystem fileSys = FileSystem.get(conf); FileSystem fileSys = FileSystem.get(conf);
FSDataOutputStream stm = fileSys.create(name, true, FSDataOutputStream stm = fileSys.create(name, true,

View File

@ -71,13 +71,13 @@ private static String getRack(int id, int level) {
return rack.toString(); return rack.toString();
} }
public void testMultiLevelCaching() throws IOException { public void testMultiLevelCaching() throws Exception {
for (int i = 1 ; i <= MAX_LEVEL; ++i) { for (int i = 1 ; i <= MAX_LEVEL; ++i) {
testCachingAtLevel(i); testCachingAtLevel(i);
} }
} }
private void testCachingAtLevel(int level) throws IOException { private void testCachingAtLevel(int level) throws Exception {
String namenode = null; String namenode = null;
MiniDFSCluster dfs = null; MiniDFSCluster dfs = null;
MiniMRCluster mr = null; MiniMRCluster mr = null;

View File

@ -31,6 +31,7 @@
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Properties; import java.util.Properties;
import java.util.concurrent.TimeoutException;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
@ -449,11 +450,14 @@ static String getTaskSignalParameter(boolean isMap) {
static void signalTasks(MiniDFSCluster dfs, FileSystem fileSys, static void signalTasks(MiniDFSCluster dfs, FileSystem fileSys,
String mapSignalFile, String mapSignalFile,
String reduceSignalFile, int replication) String reduceSignalFile, int replication)
throws IOException { throws IOException, TimeoutException {
writeFile(dfs.getNameNode(), fileSys.getConf(), new Path(mapSignalFile), try {
(short)replication); writeFile(dfs.getNameNode(), fileSys.getConf(), new Path(mapSignalFile),
writeFile(dfs.getNameNode(), fileSys.getConf(), new Path(reduceSignalFile), (short)replication);
(short)replication); writeFile(dfs.getNameNode(), fileSys.getConf(), new Path(reduceSignalFile), (short)replication);
} catch (InterruptedException ie) {
// Ignore
}
} }
/** /**
@ -462,12 +466,16 @@ static void signalTasks(MiniDFSCluster dfs, FileSystem fileSys,
static void signalTasks(MiniDFSCluster dfs, FileSystem fileSys, static void signalTasks(MiniDFSCluster dfs, FileSystem fileSys,
boolean isMap, String mapSignalFile, boolean isMap, String mapSignalFile,
String reduceSignalFile) String reduceSignalFile)
throws IOException { throws IOException, TimeoutException {
// signal the maps to complete try {
writeFile(dfs.getNameNode(), fileSys.getConf(), // signal the maps to complete
isMap writeFile(dfs.getNameNode(), fileSys.getConf(),
? new Path(mapSignalFile) isMap
: new Path(reduceSignalFile), (short)1); ? new Path(mapSignalFile)
: new Path(reduceSignalFile), (short)1);
} catch (InterruptedException ie) {
// Ignore
}
} }
static String getSignalFile(Path dir) { static String getSignalFile(Path dir) {
@ -483,7 +491,8 @@ static String getReduceSignalFile(Path dir) {
} }
static void writeFile(NameNode namenode, Configuration conf, Path name, static void writeFile(NameNode namenode, Configuration conf, Path name,
short replication) throws IOException { short replication)
throws IOException, TimeoutException, InterruptedException {
FileSystem fileSys = FileSystem.get(conf); FileSystem fileSys = FileSystem.get(conf);
SequenceFile.Writer writer = SequenceFile.Writer writer =
SequenceFile.createWriter(fileSys, conf, name, SequenceFile.createWriter(fileSys, conf, name,

View File

@ -23,6 +23,7 @@
import java.util.List; import java.util.List;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.zip.GZIPOutputStream; import java.util.zip.GZIPOutputStream;
import java.util.concurrent.TimeoutException;
import junit.framework.TestCase; import junit.framework.TestCase;
@ -278,7 +279,7 @@ public void testReinit() throws Exception {
assertFalse(rr.nextKeyValue()); assertFalse(rr.nextKeyValue());
} }
public void testSplitPlacement() throws IOException { public void testSplitPlacement() throws Exception {
MiniDFSCluster dfs = null; MiniDFSCluster dfs = null;
FileSystem fileSys = null; FileSystem fileSys = null;
try { try {
@ -678,7 +679,8 @@ public void testSplitPlacement() throws IOException {
} }
static void writeFile(Configuration conf, Path name, static void writeFile(Configuration conf, Path name,
short replication, int numBlocks) throws IOException { short replication, int numBlocks)
throws IOException, TimeoutException, InterruptedException {
FileSystem fileSys = FileSystem.get(conf); FileSystem fileSys = FileSystem.get(conf);
FSDataOutputStream stm = fileSys.create(name, true, FSDataOutputStream stm = fileSys.create(name, true,
@ -689,7 +691,8 @@ static void writeFile(Configuration conf, Path name,
// Creates the gzip file and return the FileStatus // Creates the gzip file and return the FileStatus
static FileStatus writeGzipFile(Configuration conf, Path name, static FileStatus writeGzipFile(Configuration conf, Path name,
short replication, int numBlocks) throws IOException { short replication, int numBlocks)
throws IOException, TimeoutException, InterruptedException {
FileSystem fileSys = FileSystem.get(conf); FileSystem fileSys = FileSystem.get(conf);
GZIPOutputStream out = new GZIPOutputStream(fileSys.create(name, true, conf GZIPOutputStream out = new GZIPOutputStream(fileSys.create(name, true, conf
@ -699,7 +702,8 @@ static FileStatus writeGzipFile(Configuration conf, Path name,
} }
private static void writeDataAndSetReplication(FileSystem fileSys, Path name, private static void writeDataAndSetReplication(FileSystem fileSys, Path name,
OutputStream out, short replication, int numBlocks) throws IOException { OutputStream out, short replication, int numBlocks)
throws IOException, TimeoutException, InterruptedException {
for (int i = 0; i < numBlocks; i++) { for (int i = 0; i < numBlocks; i++) {
out.write(databuf); out.write(databuf);
} }
@ -707,7 +711,7 @@ private static void writeDataAndSetReplication(FileSystem fileSys, Path name,
DFSTestUtil.waitReplication(fileSys, name, replication); DFSTestUtil.waitReplication(fileSys, name, replication);
} }
public void testSplitPlacementForCompressedFiles() throws IOException { public void testSplitPlacementForCompressedFiles() throws Exception {
MiniDFSCluster dfs = null; MiniDFSCluster dfs = null;
FileSystem fileSys = null; FileSystem fileSys = null;
try { try {
@ -1058,7 +1062,7 @@ public void testSplitPlacementForCompressedFiles() throws IOException {
/** /**
* Test that CFIF can handle missing blocks. * Test that CFIF can handle missing blocks.
*/ */
public void testMissingBlocks() throws IOException { public void testMissingBlocks() throws Exception {
String namenode = null; String namenode = null;
MiniDFSCluster dfs = null; MiniDFSCluster dfs = null;
FileSystem fileSys = null; FileSystem fileSys = null;