HBASE-11863 WAL files are not archived and stays in the WAL directory after splitting

This commit is contained in:
Enis Soztutar 2014-09-02 18:11:38 -07:00
parent 5e656f85df
commit 4b2f43fafb
3 changed files with 52 additions and 1 deletions

View File

@ -78,6 +78,8 @@ import org.apache.zookeeper.KeeperException.NoNodeException;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.data.Stat;
import com.google.common.annotations.VisibleForTesting;
/**
* Distributes the task of log splitting to the available region servers.
* Coordination happens via zookeeper. For every log file that has to be split a
@ -437,6 +439,11 @@ public class SplitLogManager extends ZooKeeperListener {
}
}
@VisibleForTesting
ConcurrentMap<String, Task> getTasks() {
return tasks;
}
private int activeTasks(final TaskBatch batch) {
int count = 0;
for (Task t: tasks.values()) {

View File

@ -260,6 +260,10 @@ public class TestDistributedLogSplitting {
}
LOG.info(count + " edits in " + files.length + " recovered edits files.");
}
// check that the log file is moved
assertFalse(fs.exists(logDir));
assertEquals(NUM_LOG_LINES, count);
}

View File

@ -40,6 +40,7 @@ import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicLong;
@ -148,7 +149,7 @@ public class TestSplitLogManager {
@After
public void teardown() throws IOException, KeeperException {
stopper.stop("");
slm.stop();
if (slm != null) slm.stop();
TEST_UTIL.shutdownMiniZKCluster();
}
@ -159,6 +160,7 @@ public class TestSplitLogManager {
private void waitForCounter(final AtomicLong ctr, long oldval, long newval, long timems)
throws Exception {
Expr e = new Expr() {
@Override
public long eval() {
return ctr.get();
}
@ -500,6 +502,44 @@ public class TestSplitLogManager {
assertFalse(fs.exists(emptyLogDirPath));
}
@Test (timeout = 60000)
public void testLogFilesAreArchived() throws Exception {
LOG.info("testLogFilesAreArchived");
final SplitLogManager slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER);
FileSystem fs = TEST_UTIL.getTestFileSystem();
Path dir = TEST_UTIL.getDataTestDirOnTestFS("testLogFilesAreArchived");
conf.set(HConstants.HBASE_DIR, dir.toString());
Path logDirPath = new Path(dir, UUID.randomUUID().toString());
fs.mkdirs(logDirPath);
// create an empty log file
String logFile = ServerName.valueOf("foo", 1, 1).toString();
fs.create(new Path(logDirPath, logFile)).close();
// spin up a thread mocking split done.
new Thread() {
@Override
public void run() {
boolean done = false;
while (!done) {
for (Map.Entry<String, Task> entry : slm.getTasks().entrySet()) {
final ServerName worker1 = ServerName.valueOf("worker1,1,1");
SplitLogTask slt = new SplitLogTask.Done(worker1, RecoveryMode.LOG_SPLITTING);
try {
ZKUtil.setData(zkw, entry.getKey(), slt.toByteArray());
} catch (KeeperException e) {
LOG.warn(e);
}
done = true;
}
}
};
}.start();
slm.splitLogDistributed(logDirPath);
assertFalse(fs.exists(logDirPath));
}
/**
* The following test case is aiming to test the situation when distributedLogReplay is turned off
* and restart a cluster there should no recovery regions in ZK left.