HDFS-7898. Change TestAppendSnapshotTruncate to fail-fast. Contributed by Tsz Wo Nicholas Sze.

(cherry picked from commit e43882e84a)
(cherry picked from commit c7105fcff0)

Conflicts:
	hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
This commit is contained in:
Jing Zhao 2015-03-09 10:52:17 -07:00
parent 9d1f67f2f2
commit 368ab2cd37
3 changed files with 51 additions and 24 deletions

View File

@ -417,6 +417,9 @@ Release 2.7.0 - UNRELEASED
HDFS-7411. Change decommission logic to throttle by blocks rather than HDFS-7411. Change decommission logic to throttle by blocks rather than
nodes in each interval. (Andrew Wang via cdouglas) nodes in each interval. (Andrew Wang via cdouglas)
HDFS-7898. Change TestAppendSnapshotTruncate to fail-fast.
(Tsz Wo Nicholas Sze via jing9)
HDFS-6806. HDFS Rolling upgrade document should mention the versions HDFS-6806. HDFS Rolling upgrade document should mention the versions
available. (J.Andreina via aajisaka) available. (J.Andreina via aajisaka)

View File

@ -41,10 +41,6 @@ import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.TestFileTruncate; import org.apache.hadoop.hdfs.server.namenode.TestFileTruncate;
import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.test.GenericTestUtils;
@ -69,6 +65,9 @@ public class TestAppendSnapshotTruncate {
private static final int BLOCK_SIZE = 1024; private static final int BLOCK_SIZE = 1024;
private static final int DATANODE_NUM = 3; private static final int DATANODE_NUM = 3;
private static final short REPLICATION = 3; private static final short REPLICATION = 3;
private static final int FILE_WORKER_NUM = 3;
private static final long TEST_TIME_SECOND = 10;
private static final long TEST_TIMEOUT_SECOND = TEST_TIME_SECOND + 60;
static final int SHORT_HEARTBEAT = 1; static final int SHORT_HEARTBEAT = 1;
static final String[] EMPTY_STRINGS = {}; static final String[] EMPTY_STRINGS = {};
@ -106,7 +105,7 @@ public class TestAppendSnapshotTruncate {
/** Test randomly mixing append, snapshot and truncate operations. */ /** Test randomly mixing append, snapshot and truncate operations. */
@Test @Test(timeout=TEST_TIMEOUT_SECOND*1000)
public void testAST() throws Exception { public void testAST() throws Exception {
final String dirPathString = "/dir"; final String dirPathString = "/dir";
final Path dir = new Path(dirPathString); final Path dir = new Path(dirPathString);
@ -121,12 +120,12 @@ public class TestAppendSnapshotTruncate {
} }
localDir.mkdirs(); localDir.mkdirs();
final DirWorker w = new DirWorker(dir, localDir, 3); final DirWorker w = new DirWorker(dir, localDir, FILE_WORKER_NUM);
w.startAllFiles(); w.startAllFiles();
w.start(); w.start();
Worker.sleep(10L*1000); Worker.sleep(TEST_TIME_SECOND * 1000);
w.stop(); w.stop();
w.stoptAllFiles(); w.stopAllFiles();
w.checkEverything(); w.checkEverything();
} }
@ -259,7 +258,7 @@ public class TestAppendSnapshotTruncate {
} }
} }
void stoptAllFiles() throws InterruptedException { void stopAllFiles() throws InterruptedException {
for(FileWorker f : files) { for(FileWorker f : files) {
f.stop(); f.stop();
} }
@ -269,12 +268,12 @@ public class TestAppendSnapshotTruncate {
LOG.info("checkEverything"); LOG.info("checkEverything");
for(FileWorker f : files) { for(FileWorker f : files) {
f.checkFullFile(); f.checkFullFile();
Preconditions.checkState(f.state.get() != State.ERROR); f.checkErrorState();
} }
for(String snapshot : snapshotPaths.keySet()) { for(String snapshot : snapshotPaths.keySet()) {
checkSnapshot(snapshot); checkSnapshot(snapshot);
} }
Preconditions.checkState(state.get() != State.ERROR); checkErrorState();
} }
} }
@ -364,7 +363,7 @@ public class TestAppendSnapshotTruncate {
b.append(", newLength=").append(newLength) b.append(", newLength=").append(newLength)
.append(", isReady=").append(isReady); .append(", isReady=").append(isReady);
if (!isReady) { if (!isReady) {
TestFileTruncate.checkBlockRecovery(file, dfs); TestFileTruncate.checkBlockRecovery(file, dfs, 100, 300L);
} }
return isReady; return isReady;
} }
@ -407,6 +406,7 @@ public class TestAppendSnapshotTruncate {
IDLE(false), RUNNING(false), STOPPED(true), ERROR(true); IDLE(false), RUNNING(false), STOPPED(true), ERROR(true);
final boolean isTerminated; final boolean isTerminated;
State(boolean isTerminated) { State(boolean isTerminated) {
this.isTerminated = isTerminated; this.isTerminated = isTerminated;
} }
@ -417,10 +417,28 @@ public class TestAppendSnapshotTruncate {
final AtomicBoolean isCalling = new AtomicBoolean(); final AtomicBoolean isCalling = new AtomicBoolean();
final AtomicReference<Thread> thread = new AtomicReference<Thread>(); final AtomicReference<Thread> thread = new AtomicReference<Thread>();
private Throwable thrown = null;
Worker(String name) { Worker(String name) {
this.name = name; this.name = name;
} }
State checkErrorState() {
final State s = state.get();
if (s == State.ERROR) {
throw new IllegalStateException(name + " has " + s, thrown);
}
return s;
}
void setErrorState(Throwable t) {
checkErrorState();
LOG.error("Worker " + name + " failed.", t);
state.set(State.ERROR);
thrown = t;
}
void start() { void start() {
Preconditions.checkState(state.compareAndSet(State.IDLE, State.RUNNING)); Preconditions.checkState(state.compareAndSet(State.IDLE, State.RUNNING));
@ -429,14 +447,13 @@ public class TestAppendSnapshotTruncate {
@Override @Override
public void run() { public void run() {
final Random r = DFSUtil.getRandom(); final Random r = DFSUtil.getRandom();
for(State s; (s = state.get()) == State.RUNNING || s == State.IDLE;) { for(State s; !(s = checkErrorState()).isTerminated;) {
if (s == State.RUNNING) { if (s == State.RUNNING) {
isCalling.set(true); isCalling.set(true);
try { try {
LOG.info(call()); LOG.info(call());
} catch (Exception e) { } catch(Throwable t) {
LOG.error("Worker " + name + " failed.", e); setErrorState(t);
state.set(State.ERROR);
return; return;
} }
isCalling.set(false); isCalling.set(false);
@ -451,7 +468,11 @@ public class TestAppendSnapshotTruncate {
} }
boolean isPaused() { boolean isPaused() {
return state.get() == State.IDLE && !isCalling.get(); final State s = checkErrorState();
if (s == State.STOPPED) {
throw new IllegalStateException(name + " is " + s);
}
return s == State.IDLE && !isCalling.get();
} }
void pause() { void pause() {
@ -459,9 +480,7 @@ public class TestAppendSnapshotTruncate {
} }
void stop() throws InterruptedException { void stop() throws InterruptedException {
if (state.get() == State.ERROR) { checkErrorState();
return;
}
state.set(State.STOPPED); state.set(State.STOPPED);
thread.get().join(); thread.get().join();

View File

@ -1151,8 +1151,13 @@ public class TestFileTruncate {
public static void checkBlockRecovery(Path p, DistributedFileSystem dfs) public static void checkBlockRecovery(Path p, DistributedFileSystem dfs)
throws IOException { throws IOException {
checkBlockRecovery(p, dfs, SUCCESS_ATTEMPTS, SLEEP);
}
public static void checkBlockRecovery(Path p, DistributedFileSystem dfs,
int attempts, long sleepMs) throws IOException {
boolean success = false; boolean success = false;
for(int i = 0; i < SUCCESS_ATTEMPTS; i++) { for(int i = 0; i < attempts; i++) {
LocatedBlocks blocks = getLocatedBlocks(p, dfs); LocatedBlocks blocks = getLocatedBlocks(p, dfs);
boolean noLastBlock = blocks.getLastLocatedBlock() == null; boolean noLastBlock = blocks.getLastLocatedBlock() == null;
if(!blocks.isUnderConstruction() && if(!blocks.isUnderConstruction() &&
@ -1160,9 +1165,9 @@ public class TestFileTruncate {
success = true; success = true;
break; break;
} }
try { Thread.sleep(SLEEP); } catch (InterruptedException ignored) {} try { Thread.sleep(sleepMs); } catch (InterruptedException ignored) {}
} }
assertThat("inode should complete in ~" + SLEEP * SUCCESS_ATTEMPTS + " ms.", assertThat("inode should complete in ~" + sleepMs * attempts + " ms.",
success, is(true)); success, is(true));
} }