HDFS-7898. Change TestAppendSnapshotTruncate to fail-fast. Contributed by Tsz Wo Nicholas Sze.

(cherry picked from commit e43882e84a)
This commit is contained in:
Jing Zhao 2015-03-09 10:52:17 -07:00
parent b46f9e72db
commit c7105fcff0
3 changed files with 51 additions and 24 deletions

View File

@ -429,6 +429,9 @@ Release 2.7.0 - UNRELEASED
HDFS-7411. Change decommission logic to throttle by blocks rather than
nodes in each interval. (Andrew Wang via cdouglas)
HDFS-7898. Change TestAppendSnapshotTruncate to fail-fast.
(Tsz Wo Nicholas Sze via jing9)
OPTIMIZATIONS
HDFS-7454. Reduce memory footprint for AclEntries in NameNode.

View File

@ -41,10 +41,6 @@
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.TestFileTruncate;
import org.apache.hadoop.test.GenericTestUtils;
@ -69,6 +65,9 @@ public class TestAppendSnapshotTruncate {
private static final int BLOCK_SIZE = 1024;
private static final int DATANODE_NUM = 3;
private static final short REPLICATION = 3;
private static final int FILE_WORKER_NUM = 3;
private static final long TEST_TIME_SECOND = 10;
private static final long TEST_TIMEOUT_SECOND = TEST_TIME_SECOND + 60;
static final int SHORT_HEARTBEAT = 1;
static final String[] EMPTY_STRINGS = {};
@ -106,7 +105,7 @@ public static void tearDown() throws IOException {
/** Test randomly mixing append, snapshot and truncate operations. */
@Test
@Test(timeout=TEST_TIMEOUT_SECOND*1000)
public void testAST() throws Exception {
final String dirPathString = "/dir";
final Path dir = new Path(dirPathString);
@ -121,12 +120,12 @@ public void testAST() throws Exception {
}
localDir.mkdirs();
final DirWorker w = new DirWorker(dir, localDir, 3);
final DirWorker w = new DirWorker(dir, localDir, FILE_WORKER_NUM);
w.startAllFiles();
w.start();
Worker.sleep(10L*1000);
Worker.sleep(TEST_TIME_SECOND * 1000);
w.stop();
w.stoptAllFiles();
w.stopAllFiles();
w.checkEverything();
}
@ -259,7 +258,7 @@ void startAllFiles() {
}
}
void stoptAllFiles() throws InterruptedException {
void stopAllFiles() throws InterruptedException {
for(FileWorker f : files) {
f.stop();
}
@ -269,12 +268,12 @@ void checkEverything() throws IOException {
LOG.info("checkEverything");
for(FileWorker f : files) {
f.checkFullFile();
Preconditions.checkState(f.state.get() != State.ERROR);
f.checkErrorState();
}
for(String snapshot : snapshotPaths.keySet()) {
checkSnapshot(snapshot);
}
Preconditions.checkState(state.get() != State.ERROR);
checkErrorState();
}
}
@ -364,7 +363,7 @@ private boolean truncate(long newLength, StringBuilder b) throws IOException {
b.append(", newLength=").append(newLength)
.append(", isReady=").append(isReady);
if (!isReady) {
TestFileTruncate.checkBlockRecovery(file, dfs);
TestFileTruncate.checkBlockRecovery(file, dfs, 100, 300L);
}
return isReady;
}
@ -407,6 +406,7 @@ enum State {
IDLE(false), RUNNING(false), STOPPED(true), ERROR(true);
final boolean isTerminated;
State(boolean isTerminated) {
this.isTerminated = isTerminated;
}
@ -416,11 +416,29 @@ enum State {
final AtomicReference<State> state = new AtomicReference<State>(State.IDLE);
final AtomicBoolean isCalling = new AtomicBoolean();
final AtomicReference<Thread> thread = new AtomicReference<Thread>();
private Throwable thrown = null;
Worker(String name) {
this.name = name;
}
State checkErrorState() {
final State s = state.get();
if (s == State.ERROR) {
throw new IllegalStateException(name + " has " + s, thrown);
}
return s;
}
void setErrorState(Throwable t) {
checkErrorState();
LOG.error("Worker " + name + " failed.", t);
state.set(State.ERROR);
thrown = t;
}
void start() {
Preconditions.checkState(state.compareAndSet(State.IDLE, State.RUNNING));
@ -429,14 +447,13 @@ void start() {
@Override
public void run() {
final Random r = DFSUtil.getRandom();
for(State s; (s = state.get()) == State.RUNNING || s == State.IDLE;) {
for(State s; !(s = checkErrorState()).isTerminated;) {
if (s == State.RUNNING) {
isCalling.set(true);
try {
LOG.info(call());
} catch (Exception e) {
LOG.error("Worker " + name + " failed.", e);
state.set(State.ERROR);
} catch(Throwable t) {
setErrorState(t);
return;
}
isCalling.set(false);
@ -451,7 +468,11 @@ public void run() {
}
boolean isPaused() {
return state.get() == State.IDLE && !isCalling.get();
final State s = checkErrorState();
if (s == State.STOPPED) {
throw new IllegalStateException(name + " is " + s);
}
return s == State.IDLE && !isCalling.get();
}
void pause() {
@ -459,9 +480,7 @@ void pause() {
}
void stop() throws InterruptedException {
if (state.get() == State.ERROR) {
return;
}
checkErrorState();
state.set(State.STOPPED);
thread.get().join();

View File

@ -1151,8 +1151,13 @@ static void checkBlockRecovery(Path p) throws IOException {
public static void checkBlockRecovery(Path p, DistributedFileSystem dfs)
throws IOException {
checkBlockRecovery(p, dfs, SUCCESS_ATTEMPTS, SLEEP);
}
public static void checkBlockRecovery(Path p, DistributedFileSystem dfs,
int attempts, long sleepMs) throws IOException {
boolean success = false;
for(int i = 0; i < SUCCESS_ATTEMPTS; i++) {
for(int i = 0; i < attempts; i++) {
LocatedBlocks blocks = getLocatedBlocks(p, dfs);
boolean noLastBlock = blocks.getLastLocatedBlock() == null;
if(!blocks.isUnderConstruction() &&
@ -1160,9 +1165,9 @@ public static void checkBlockRecovery(Path p, DistributedFileSystem dfs)
success = true;
break;
}
try { Thread.sleep(SLEEP); } catch (InterruptedException ignored) {}
try { Thread.sleep(sleepMs); } catch (InterruptedException ignored) {}
}
assertThat("inode should complete in ~" + SLEEP * SUCCESS_ATTEMPTS + " ms.",
assertThat("inode should complete in ~" + sleepMs * attempts + " ms.",
success, is(true));
}