HBASE-11863 WAL files are not archived and stays in the WAL directory after splitting
This commit is contained in:
parent
b2d528aac6
commit
bb1d95385a
|
@ -64,6 +64,8 @@ import org.apache.hadoop.hbase.util.FSUtils;
|
||||||
import org.apache.hadoop.hbase.util.Pair;
|
import org.apache.hadoop.hbase.util.Pair;
|
||||||
import org.apache.hadoop.hbase.util.Threads;
|
import org.apache.hadoop.hbase.util.Threads;
|
||||||
|
|
||||||
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Distributes the task of log splitting to the available region servers.
|
* Distributes the task of log splitting to the available region servers.
|
||||||
* Coordination happens via coordination engine. For every log file that has to be split a
|
* Coordination happens via coordination engine. For every log file that has to be split a
|
||||||
|
@ -355,6 +357,11 @@ public class SplitLogManager {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
ConcurrentMap<String, Task> getTasks() {
|
||||||
|
return tasks;
|
||||||
|
}
|
||||||
|
|
||||||
private int activeTasks(final TaskBatch batch) {
|
private int activeTasks(final TaskBatch batch) {
|
||||||
int count = 0;
|
int count = 0;
|
||||||
for (Task t : tasks.values()) {
|
for (Task t : tasks.values()) {
|
||||||
|
|
|
@ -260,6 +260,10 @@ public class TestDistributedLogSplitting {
|
||||||
}
|
}
|
||||||
LOG.info(count + " edits in " + files.length + " recovered edits files.");
|
LOG.info(count + " edits in " + files.length + " recovered edits files.");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// check that the log file is moved
|
||||||
|
assertFalse(fs.exists(logDir));
|
||||||
|
|
||||||
assertEquals(NUM_LOG_LINES, count);
|
assertEquals(NUM_LOG_LINES, count);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -37,6 +37,7 @@ import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
import java.util.UUID;
|
import java.util.UUID;
|
||||||
import java.util.concurrent.atomic.AtomicLong;
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
|
|
||||||
|
@ -216,7 +217,7 @@ public class TestSplitLogManager {
|
||||||
@After
|
@After
|
||||||
public void teardown() throws IOException, KeeperException {
|
public void teardown() throws IOException, KeeperException {
|
||||||
stopper.stop("");
|
stopper.stop("");
|
||||||
slm.stop();
|
if (slm != null) slm.stop();
|
||||||
TEST_UTIL.shutdownMiniZKCluster();
|
TEST_UTIL.shutdownMiniZKCluster();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -227,6 +228,7 @@ public class TestSplitLogManager {
|
||||||
private void waitForCounter(final AtomicLong ctr, long oldval, long newval, long timems)
|
private void waitForCounter(final AtomicLong ctr, long oldval, long newval, long timems)
|
||||||
throws Exception {
|
throws Exception {
|
||||||
Expr e = new Expr() {
|
Expr e = new Expr() {
|
||||||
|
@Override
|
||||||
public long eval() {
|
public long eval() {
|
||||||
return ctr.get();
|
return ctr.get();
|
||||||
}
|
}
|
||||||
|
@ -567,6 +569,44 @@ public class TestSplitLogManager {
|
||||||
assertFalse(fs.exists(emptyLogDirPath));
|
assertFalse(fs.exists(emptyLogDirPath));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test (timeout = 60000)
|
||||||
|
public void testLogFilesAreArchived() throws Exception {
|
||||||
|
LOG.info("testLogFilesAreArchived");
|
||||||
|
final SplitLogManager slm = new SplitLogManager(ds, conf, stopper, master, DUMMY_MASTER);
|
||||||
|
FileSystem fs = TEST_UTIL.getTestFileSystem();
|
||||||
|
Path dir = TEST_UTIL.getDataTestDirOnTestFS("testLogFilesAreArchived");
|
||||||
|
conf.set(HConstants.HBASE_DIR, dir.toString());
|
||||||
|
Path logDirPath = new Path(dir, UUID.randomUUID().toString());
|
||||||
|
fs.mkdirs(logDirPath);
|
||||||
|
// create an empty log file
|
||||||
|
String logFile = ServerName.valueOf("foo", 1, 1).toString();
|
||||||
|
fs.create(new Path(logDirPath, logFile)).close();
|
||||||
|
|
||||||
|
// spin up a thread mocking split done.
|
||||||
|
new Thread() {
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
boolean done = false;
|
||||||
|
while (!done) {
|
||||||
|
for (Map.Entry<String, Task> entry : slm.getTasks().entrySet()) {
|
||||||
|
final ServerName worker1 = ServerName.valueOf("worker1,1,1");
|
||||||
|
SplitLogTask slt = new SplitLogTask.Done(worker1, RecoveryMode.LOG_SPLITTING);
|
||||||
|
try {
|
||||||
|
ZKUtil.setData(zkw, entry.getKey(), slt.toByteArray());
|
||||||
|
} catch (KeeperException e) {
|
||||||
|
LOG.warn(e);
|
||||||
|
}
|
||||||
|
done = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}.start();
|
||||||
|
|
||||||
|
slm.splitLogDistributed(logDirPath);
|
||||||
|
|
||||||
|
assertFalse(fs.exists(logDirPath));
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The following test case is aiming to test the situation when distributedLogReplay is turned off
|
* The following test case is aiming to test the situation when distributedLogReplay is turned off
|
||||||
* and restart a cluster there should no recovery regions in ZK left.
|
* and restart a cluster there should no recovery regions in ZK left.
|
||||||
|
|
Loading…
Reference in New Issue